Connection_e1.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. """
  2. """
  3. # from hub import methods, Global
  4. import struct
  5. import asyncio
  6. import time
  7. import socket
  8. import sys
  9. import importlib
  10. # --- for linux
  11. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-server-bg03')
  12. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-pysdk')
  13. # --- for windows
  14. sys.path.append(r'C:\SRI-DINO.Server-py\sri-server-bg03')
  15. sys.path.append(r'C:\SRI-DINO.Server-py\sri-pysdk')
  16. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  17. methods = importlib.import_module(f"xlib")
  18. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  19. serial_rid_dict = {
  20. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  21. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  22. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  23. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  24. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  25. }
  26. account_uid_dict = {
  27. 'ego': 3
  28. }
  29. # live_relationship = {} # {<id-1>id-2>: True}
  30. class SRIConnection(asyncio.Protocol):
  31. """"""
  32. head_sequence = '<hh' # 字节序规则
  33. head_size = struct.calcsize(head_sequence)
  34. message_data = b''
  35. def __init__(self):
  36. self.lock = asyncio.Lock() # 初始化锁
  37. def connection_made(self, client):
  38. """
  39. 建立客户端连接
  40. """
  41. peername = client.get_extra_info('peername')
  42. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 12700130000 客户端id
  43. self.connection_id = int(f"{peername[0].replace('.', '')}")
  44. self.client = client
  45. # self.data = b''
  46. self.client_type = None
  47. self.client_info = None
  48. self.update_at = methods.now_ts()
  49. clients[self.connection_id] = self
  50. # 获取底层 socket
  51. # sock = self.client.get_extra_info('socket')
  52. # 查看缓冲区大小
  53. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  54. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  55. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  56. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  57. def connection_lost(self, exc):
  58. """
  59. 关闭连接
  60. """
  61. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
  62. # --- 处理车端掉线,通知所有舱端
  63. if self.client_type == 'vehicle':
  64. """
  65. SCDelRobot: 消息体
  66. SCDelRobot.peer: int32
  67. SCDelRobot.egotype: int32
  68. """
  69. o2 = protobuf.SCDelRobot()
  70. o2.peer = self.client_info.get('connection_id') # 车端id
  71. o2.egotype = self.client_info.get('egotype')
  72. re_command_id = protobuf.SC_NotifyDel # 4017
  73. re_body_length = o2.ByteSize()
  74. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  75. re_body_data = o2.SerializeToString()
  76. re_send_data = re_head_data + re_body_data
  77. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
  78. for item in clients.values():
  79. if item.client_type == 'cockpit':
  80. item.client.write(re_send_data)
  81. # --- 处理舱端掉线,通知所有车端
  82. if self.client_type == 'cockpit':
  83. """
  84. Leave: 消息体
  85. Leave.peer: int32(车端rid)
  86. Leave.egotype: int32
  87. """
  88. o2 = protobuf.Leave()
  89. o2.peer = self.client_info.get('connection_id') # 舱端id
  90. o2.egotype = 1 # 客户端类型
  91. re_command_id = protobuf.SC_NotifyLeave # 4014
  92. re_body_length = o2.ByteSize()
  93. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  94. re_body_data = o2.SerializeToString()
  95. re_send_data = re_head_data + re_body_data
  96. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  97. for item in clients.values():
  98. if item.client_type == 'vehicle':
  99. item.client.write(re_send_data)
  100. # --- clean
  101. if self.connection_id in clients:
  102. del clients[self.connection_id]
  103. # --- clean live_relationship
  104. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  105. # for key in keys:
  106. # del live_relationship[key]
  107. def data_received(self, data):
  108. """
  109. 消息处理
  110. """
  111. asyncio.create_task(self.handle_data(data))
  112. async def handle_data(self, data):
  113. """
  114. 消息处理
  115. """
  116. # --- before
  117. # loop = asyncio.get_running_loop()
  118. # await loop.run_in_executor(None, self.process_data, data)
  119. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  120. # await self.process_data(data)
  121. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  122. loop = asyncio.get_running_loop()
  123. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  124. def process_data(self, data):
  125. """
  126. 消息处理
  127. """
  128. try:
  129. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  130. self.message_data += data # 执行了这个以后
  131. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  132. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  133. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  134. # count = 0
  135. while True:
  136. # --- print
  137. # count += 1
  138. # methods.debug_log(f"{self.connection_id}|SRIConnection123",
  139. # f"---------------------------------- while count: {count}")
  140. # 确保有足够的字节来处理消息头
  141. if len(self.message_data) < self.head_size:
  142. break
  143. # 获取消息头
  144. head_data = self.message_data[:self.head_size]
  145. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  146. methods.debug_log(f'{self.connection_id}|SRIConnection130',
  147. f"command_id: {command_id}, body_length: {body_length}")
  148. # 检查命令ID是否有效
  149. if not (2000 <= command_id < 5000):
  150. self.message_data = b'' # 清空无效数据
  151. break
  152. # 检查完整消息是否接收完毕
  153. total_length = self.head_size + body_length
  154. if len(self.message_data) < total_length:
  155. break # 数据不完整,等待更多数据
  156. # 调用相应处理方法
  157. method = getattr(self, f'message{command_id}', None)
  158. if method:
  159. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  160. else:
  161. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  162. f"No handler for command_id: {command_id}")
  163. # 移除已处理的消息
  164. self.message_data = self.message_data[total_length:]
  165. except Exception as exception:
  166. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  167. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  168. def message2008(self, body_data):
  169. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  170. pass
  171. def message2009(self, body_data):
  172. # --- 监听 2009
  173. """
  174. CSAdd: 消息体
  175. CSAdd.serial: string
  176. CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  177. CSAdd.name: string
  178. """
  179. object0 = protobuf.CSAdd()
  180. object0.ParseFromString(body_data)
  181. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}")
  182. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  183. methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  184. # --- update ---
  185. self.client_type = 'vehicle'
  186. self.client_info = {
  187. 'connection_id': self.connection_id,
  188. 'rid': serial_rid_dict.get(object0.serial),
  189. 'name': object0.name,
  190. 'serial': object0.serial,
  191. 'egotype': 2, # 客户端类型
  192. }
  193. """
  194. SCAddRobot: 消息体
  195. SCAddRobot.robot: Robot
  196. Robot: 消息体
  197. Robot.rid: int32
  198. Robot.name: string
  199. Robot.type: int32
  200. Robot.state: RobotState Offline Online Busy
  201. """
  202. o1 = protobuf.Robot()
  203. # o1.rid = self.client_info.get('rid')
  204. o1.rid = self.client_info.get('connection_id')
  205. o1.name = object0.name
  206. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  207. o1.state = protobuf.Robot.Online
  208. o2 = protobuf.SCAddRobot()
  209. o2.robot.CopyFrom(o1)
  210. re_command_id = protobuf.SC_NotifyAdd # 4016
  211. re_body_length = o2.ByteSize()
  212. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  213. re_body_data = o2.SerializeToString()
  214. re_send_data = re_head_data + re_body_data
  215. # --- send 4016 发送全部舱端
  216. for item in clients.values():
  217. if item.client_type == 'cockpit':
  218. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
  219. item.client.write(re_send_data)
  220. # --- send 4007 todo 凯强说并未用到
  221. # o1 = protobuf.SCAdd()
  222. # o1.ret = True
  223. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  224. # o1.name = object0.name
  225. # re_command_id = protobuf.SC_Add # 4007
  226. # re_body_length = o1.ByteSize()
  227. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  228. # re_body_data = o1.SerializeToString()
  229. # re_send_data = re_head_data + re_body_data
  230. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  231. # self.client.write(re_send_data)
  232. def message2000(self, body_data):
  233. # --- 解析消息体
  234. object = protobuf.CSSign()
  235. object.ParseFromString(body_data)
  236. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  237. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  238. # --- update ---
  239. self.client_type = 'cockpit'
  240. self.client_info = {
  241. 'connection_id': self.connection_id,
  242. 'uid': 3, # 对应数据库里的ego的id
  243. 'name': 'ego', # 对应数据库里
  244. 'egotype': 1, # 舱端类型
  245. }
  246. # --- send 4000
  247. object = protobuf.SCSign()
  248. object.ret = True
  249. # object.uid = self.client_info.get('uid')
  250. object.uid = self.client_info.get('connection_id')
  251. object.name = self.client_info.get('name')
  252. re_command_id = protobuf.SC_Sign # 4000
  253. re_body_length = object.ByteSize()
  254. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  255. re_body_data = object.SerializeToString()
  256. re_send_data = re_head_data + re_body_data
  257. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  258. self.client.write(re_send_data)
  259. def message2010(self, body_data):
  260. # --- send 4008 发送全部车端信息列表
  261. """
  262. Robot: 消息体
  263. Robot.rid: string
  264. Robot.name: string
  265. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  266. Robot.state: enum Offline Online Busy
  267. """
  268. # --- 获取全部在线车辆列表
  269. o2 = protobuf.SCRobot()
  270. for item in clients.values():
  271. if item.client_type and item.client_type == 'vehicle':
  272. o1 = protobuf.Robot()
  273. # o1.rid = item.client_info.get('rid')
  274. o1.rid = item.client_info.get('connection_id')
  275. o1.name = item.client_info.get('name')
  276. o1.type = 2 # EgoType::Car
  277. o1.state = protobuf.Robot.RobotState.Value('Online')
  278. o2.robot.add().CopyFrom(o1)
  279. re_command_id = protobuf.SC_Robot # 4008
  280. re_body_length = o2.ByteSize()
  281. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  282. re_body_data = o2.SerializeToString()
  283. re_send_data = re_head_data + re_body_data
  284. methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
  285. self.client.write(re_send_data)
  286. def message2001(self, body_data):
  287. # --- 解析消息体
  288. """
  289. CSReq: 消息体
  290. CSReq.peer: int32(rid,车端唯一标识)
  291. CSReq.index: int32(相机位置,RenderPosition)
  292. CSReq.egotype: int32(终端类型,舱端/车端)
  293. """
  294. o1 = protobuf.CSReq()
  295. o1.ParseFromString(body_data)
  296. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  297. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
  298. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  299. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  300. # --- send 4009 指定车端
  301. for item in clients.values():
  302. if item.client_info.get('connection_id') == o1.peer:
  303. """
  304. CSReq: 消息体
  305. CSReq.peer: int32(rid,车端唯一标识)
  306. CSReq.index: int32(相机位置,RenderPosition)
  307. CSReq.egotype: int32(终端类型,舱端/车端)
  308. """
  309. o2 = protobuf.CSReq()
  310. o2.peer = self.client_info.get('connection_id') # 舱端id
  311. o2.index = o1.index
  312. o2.egotype = o1.egotype
  313. re_command_id = protobuf.SC_NotifyReq # 4009
  314. re_body_length = o2.ByteSize()
  315. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  316. re_body_data = o2.SerializeToString()
  317. re_send_data = re_head_data + re_body_data
  318. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  319. item.client.write(re_send_data)
  320. def message2002(self, body_data):
  321. # --- 解析消息体
  322. """
  323. CSRep: 消息体
  324. CSRep.desc: VideoDesc
  325. CSRep.peer: int32
  326. CSRep.index: int32
  327. CSRep.egotype: int32
  328. """
  329. o1 = protobuf.CSRep()
  330. o1.ParseFromString(body_data)
  331. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  332. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  333. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  334. # --- send 4010 指定舱端
  335. for item in clients.values():
  336. if item.client_info.get('connection_id') == o1.peer:
  337. o2 = protobuf.CSRep()
  338. o2.desc = o1.desc
  339. o2.peer = self.client_info.get('connection_id') # 车端id
  340. o2.index = o1.index
  341. o2.egotype = o1.egotype
  342. re_command_id = protobuf.SC_NotifyRep # 4010
  343. re_body_length = o2.ByteSize()
  344. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  345. re_body_data = o2.SerializeToString()
  346. re_send_data = re_head_data + re_body_data
  347. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  348. item.client.write(re_send_data)
  349. def message2004(self, body_data):
  350. # --- 解析消息体
  351. """
  352. Offer: 消息体
  353. Offer.index: int32
  354. Offer.peer: int32(车端rid)
  355. Offer.type: string
  356. Offer.sdp: string
  357. """
  358. object = protobuf.Offer()
  359. object.ParseFromString(body_data)
  360. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  361. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  362. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  363. # --- send 4012 指定车端
  364. for item in clients.values():
  365. if item.client_info.get('connection_id') == object.peer:
  366. o1 = protobuf.Offer()
  367. o1.index = object.index
  368. o1.peer = self.client_info.get('connection_id') # 舱端id
  369. o1.type = object.type
  370. o1.sdp = object.sdp
  371. re_command_id = protobuf.SC_NotifyOffer # 4012
  372. re_body_length = o1.ByteSize()
  373. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  374. re_body_data = o1.SerializeToString()
  375. re_send_data = re_head_data + re_body_data
  376. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  377. item.client.write(re_send_data)
  378. def message2005(self, body_data):
  379. # --- 解析消息体
  380. """
  381. Answer: 消息体
  382. Answer.index: int32
  383. Answer.peer: int32(舱端uid)
  384. Answer.type: string
  385. Answer.sdp: string
  386. """
  387. object = protobuf.Answer()
  388. object.ParseFromString(body_data)
  389. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  390. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  391. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  392. # --- send 4011 指定舱端
  393. for item in clients.values():
  394. if item.client_info.get('connection_id') == object.peer:
  395. o1 = protobuf.Answer()
  396. o1.index = object.index
  397. o1.peer = self.client_info.get('connection_id') # 车端id
  398. o1.type = object.type
  399. o1.sdp = object.sdp
  400. re_command_id = protobuf.SC_NotifyAnswer # 4011
  401. re_body_length = o1.ByteSize()
  402. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  403. re_body_data = o1.SerializeToString()
  404. re_send_data = re_head_data + re_body_data
  405. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  406. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
  407. item.client.write(re_send_data)
  408. def message2006(self, body_data):
  409. # --- 解析消息体
  410. """
  411. Candidate: 消息体
  412. Candidate.index: int32
  413. Candidate.peer: int32(车端rid)
  414. Candidate.type: string
  415. Candidate.candidate: string
  416. Candidate.sdpMLineIndex: int32
  417. Candidate.sdpMid: string
  418. Candidate.egotype: int32
  419. """
  420. object = protobuf.Candidate()
  421. object.ParseFromString(body_data)
  422. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  423. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  424. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  425. for item in clients.values():
  426. # --- send 4013 舱端到车端
  427. if item.client_info.get('connection_id') == object.peer:
  428. o1 = protobuf.Candidate()
  429. o1.index = object.index
  430. o1.peer = self.client_info.get('connection_id') # 舱端id,
  431. o1.type = object.type
  432. o1.candidate = object.candidate
  433. o1.sdpMLineIndex = object.sdpMLineIndex
  434. o1.sdpMid = object.sdpMid
  435. o1.egotype = object.egotype
  436. re_command_id = protobuf.SC_NotifyCandidate # 4013
  437. re_body_length = o1.ByteSize()
  438. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  439. re_body_data = o1.SerializeToString()
  440. re_send_data = re_head_data + re_body_data
  441. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  442. item.client.write(re_send_data)
  443. # --- send 4013 车端到舱端
  444. if item.client_info.get('connection_id') == object.peer:
  445. o1 = protobuf.Candidate()
  446. o1.index = object.index
  447. o1.peer = self.client_info.get('connection_id') # 车端id,
  448. o1.type = object.type
  449. o1.candidate = object.candidate
  450. o1.sdpMLineIndex = object.sdpMLineIndex
  451. o1.sdpMid = object.sdpMid
  452. o1.egotype = object.egotype
  453. re_command_id = protobuf.SC_NotifyCandidate # 4013
  454. re_body_length = o1.ByteSize()
  455. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  456. re_body_data = o1.SerializeToString()
  457. re_send_data = re_head_data + re_body_data
  458. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  459. # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  460. item.client.write(re_send_data)
  461. def message2007(self, body_data):
  462. # --- 解析消息体
  463. """
  464. Leave: 消息体
  465. Leave.peer: int32(车端rid)
  466. Leave.egotype: int32
  467. """
  468. o1 = protobuf.Leave()
  469. o1.ParseFromString(body_data)
  470. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  471. # --- send 4009 指定车端
  472. for item in clients.values():
  473. if item.client_info.get('connection_id') == o1.peer:
  474. """
  475. Leave: 消息体
  476. Leave.peer: int32(车端rid)
  477. Leave.egotype: int32
  478. """
  479. o2 = protobuf.Leave()
  480. o2.peer = self.client_info.get('connection_id') # 舱端id
  481. o2.egotype = o1.egotype
  482. re_command_id = protobuf.SC_NotifyLeave # 4014
  483. re_body_length = o2.ByteSize()
  484. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  485. re_body_data = o2.SerializeToString()
  486. re_send_data = re_head_data + re_body_data
  487. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  488. item.client.write(re_send_data)
  489. def message2014(self, body_data):
  490. # --- 解析消息体
  491. """
  492. message CSState
  493. {
  494. UserState state=1;
  495. int32 uid=2;
  496. };
  497. """
  498. o1 = protobuf.CSState()
  499. o1.ParseFromString(body_data)
  500. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"----------------- #state: {o1.state}")
  501. methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}")
  502. # --- send 4016 发送全部舱端
  503. for item in clients.values():
  504. if item.client_type == 'cockpit':
  505. """
  506. message SCState
  507. {
  508. UserState state=1;
  509. int32 uid=2;
  510. };
  511. """
  512. o2 = protobuf.SCState()
  513. o2.state = o1.state
  514. # o2.uid = self.client_info.get('uid')
  515. o2.uid = o1.uid # todo 文磊这边发送过来的车端id
  516. re_command_id = protobuf.SC_State # 4022
  517. re_body_length = o2.ByteSize()
  518. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  519. re_body_data = o2.SerializeToString()
  520. re_send_data = re_head_data + re_body_data
  521. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  522. item.client.write(re_send_data)
  523. # --- send 6011 指发送车端操作用户的id
  524. for item in clients.values():
  525. # if item.client_info.get('connection_id') == o1.uid:
  526. if item.client_info.get('connection_id') == 10101089:
  527. """
  528. Leave: 消息体
  529. Leave.peer: int32(车端rid)
  530. Leave.egotype: int32
  531. """
  532. o3 = protobuf.UserActivityInfo()
  533. o3.user_uuid = 'SSGGSSEEFFHHWWSS' # 用户uuid
  534. o3.cockpit_id = self.connection_id # 舱端id
  535. o3.vehicle_id = 112233 # 车端id
  536. re_command_id = protobuf.S2V_SendUserInfo # 6011
  537. re_body_length = o3.ByteSize()
  538. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  539. re_body_data = o3.SerializeToString()
  540. re_send_data = re_head_data + re_body_data
  541. methods.debug_log(f"{self.connection_id}|SRIConnection611", f"re_command_id: {re_command_id}")
  542. # methods.debug_log(f"{self.connection_id}|SRIConnection611", f"o3.user_uuid: {o3.user_uuid}")
  543. # methods.debug_log(f"{self.connection_id}|SRIConnection611", f"o3.cockpit_id: {o3.cockpit_id}")
  544. # methods.debug_log(f"{self.connection_id}|SRIConnection611", f"o3.vehicle_id: {o3.vehicle_id}")
  545. item.client.write(re_send_data)
  546. @staticmethod
  547. async def check_clients():
  548. """
  549. 剔除掉线连接
  550. """
  551. count = 0
  552. while True:
  553. count += 1
  554. print(f"#count: {count}", flush=True)
  555. now_at = methods.now_ts()
  556. for connection_id in list(clients.keys()):
  557. object = clients.get(connection_id)
  558. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  559. flush=True)
  560. await asyncio.sleep(3) # 发送消息的间隔时间
  561. @staticmethod
  562. async def run():
  563. """
  564. """
  565. # --- define ---
  566. loop = asyncio.get_running_loop()
  567. server = await loop.create_server(
  568. lambda: SRIConnection(),
  569. '0.0.0.0', 20917
  570. )
  571. # --- start ---
  572. async with server:
  573. print("Server listening on 0.0.0.0:20917", flush=True)
  574. # await loop.create_task(SRIConnection.check_clients())
  575. await server.serve_forever()
  576. if __name__ == '__main__':
  577. asyncio.run(SRIConnection.run())