Connection_e1.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. """
  2. """
  3. from hub import methods, Global, protobuf
  4. from werkzeug.security import check_password_hash
  5. import struct
  6. import asyncio
  7. import time
  8. import socket
  9. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  10. serial_rid_dict = {
  11. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  12. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  13. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  14. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  15. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  16. }
  17. account_uid_dict = {
  18. 'ego': 3
  19. }
  20. # live_relationship = {} # {<id-1>id-2>: True}
  21. class SRIConnection(asyncio.Protocol):
  22. """"""
  23. head_sequence = '<hh' # 字节序规则
  24. head_size = struct.calcsize(head_sequence)
  25. message_data = b''
  26. def __init__(self):
  27. self.lock = asyncio.Lock() # 初始化锁
  28. def connection_made(self, client):
  29. """
  30. 建立客户端连接
  31. """
  32. peername = client.get_extra_info('peername')
  33. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 客户端id
  34. # self.connection_id = int(f"{peername[1]}") # 客户端id(公网情况)
  35. self.connection_id = int(f"{peername[0].replace('.', '')}") # 客户端id(局域网情况)
  36. self.client = client
  37. # self.data = b''
  38. self.client_type = None
  39. self.client_info = None
  40. self.update_at = methods.now_ts()
  41. clients[self.connection_id] = self
  42. # --- get VehicleStatus ---
  43. unique_dict = {'name': 'VehicleStatus'}
  44. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  45. data = item.get('args', {})
  46. # --- set VehicleStatus ---
  47. data[str(self.connection_id)] = 2
  48. update_dict = {'args': data}
  49. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  50. # 获取底层 socket
  51. # sock = self.client.get_extra_info('socket')
  52. # 查看缓冲区大小
  53. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  54. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  55. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  56. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  57. def connection_lost(self, exc):
  58. """
  59. 关闭连接
  60. """
  61. methods.debug_log(f"{self.connection_id}|SRIConnection|65", f"连接已关闭")
  62. methods.debug_log(f"{self.connection_id}|SRIConnection|65", f"clients: len{clients}")
  63. # --- get VehicleStatus ---
  64. unique_dict = {'name': 'VehicleStatus'}
  65. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  66. data = item.get('args', {})
  67. # --- set VehicleStatus ---
  68. data[str(self.connection_id)] = 1
  69. update_dict = {'args': data}
  70. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  71. # --- 处理车端掉线,通知所有舱端
  72. if self.client_type == 'vehicle':
  73. """
  74. SCDelRobot: 消息体
  75. SCDelRobot.peer: int32
  76. SCDelRobot.egotype: int32
  77. """
  78. o2 = protobuf.SCDelRobot()
  79. o2.peer = self.client_info.get('connection_id') # 车端id
  80. o2.egotype = self.client_info.get('egotype')
  81. re_command_id = protobuf.SC_NotifyDel # 4017
  82. re_body_length = o2.ByteSize()
  83. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  84. re_body_data = o2.SerializeToString()
  85. re_send_data = re_head_data + re_body_data
  86. methods.debug_log(f"{self.connection_id}|SRIConnection102", f"re_command_id: {re_command_id}")
  87. for item in clients.values():
  88. if item.client_type == 'cockpit':
  89. item.client.write(re_send_data)
  90. # --- 处理舱端掉线,通知所有车端
  91. if self.client_type == 'cockpit':
  92. """
  93. Leave: 消息体
  94. Leave.peer: int32(车端rid)
  95. Leave.egotype: int32
  96. """
  97. o2 = protobuf.Leave()
  98. o2.peer = self.client_info.get('connection_id') # 舱端id
  99. o2.egotype = 1 # 客户端类型
  100. re_command_id = protobuf.SC_NotifyLeave # 4014
  101. re_body_length = o2.ByteSize()
  102. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  103. re_body_data = o2.SerializeToString()
  104. re_send_data = re_head_data + re_body_data
  105. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  106. for item in clients.values():
  107. if item.client_type == 'vehicle':
  108. item.client.write(re_send_data)
  109. # --- clean
  110. if self.connection_id in clients:
  111. del clients[self.connection_id]
  112. # --- clean live_relationship
  113. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  114. # for key in keys:
  115. # del live_relationship[key]
  116. def data_received(self, data):
  117. """
  118. 消息处理
  119. """
  120. asyncio.create_task(self.handle_data(data))
  121. async def handle_data(self, data):
  122. """
  123. 消息处理
  124. """
  125. # --- before
  126. # loop = asyncio.get_running_loop()
  127. # await loop.run_in_executor(None, self.process_data, data)
  128. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  129. # await self.process_data(data)
  130. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  131. loop = asyncio.get_running_loop()
  132. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  133. def process_data(self, data):
  134. """
  135. 消息处理
  136. """
  137. try:
  138. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  139. self.message_data += data # 执行了这个以后
  140. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  141. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  142. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  143. # count = 0
  144. while True:
  145. # --- print
  146. # count += 1
  147. # methods.debug_log(f"{self.connection_id}|SRIConnection123",
  148. # f"---------------------------------- while count: {count}")
  149. # 确保有足够的字节来处理消息头
  150. if len(self.message_data) < self.head_size:
  151. break
  152. # 获取消息头
  153. head_data = self.message_data[:self.head_size]
  154. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  155. if command_id not in [2008]:
  156. methods.debug_log(f'{self.connection_id}|SRIConnection.166',
  157. f"command_id: {command_id}, body_length: {body_length}")
  158. # else:
  159. # methods.debug_log(f'{self.connection_id}|SRIConnection.169',
  160. # f"command_id: {command_id}, body_length: {body_length}")
  161. # 检查命令ID是否有效
  162. if not (1000 < command_id < 9000):
  163. self.message_data = b'' # 清空无效数据
  164. break
  165. # 检查完整消息是否接收完毕
  166. total_length = self.head_size + body_length
  167. if len(self.message_data) < total_length:
  168. break # 数据不完整,等待更多数据
  169. # 调用相应处理方法
  170. method = getattr(self, f'message{command_id}', None)
  171. if method:
  172. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  173. else:
  174. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  175. f"No handler for command_id: {command_id}")
  176. # 移除已处理的消息
  177. self.message_data = self.message_data[total_length:]
  178. except Exception as exception:
  179. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  180. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  181. def message2008(self, body_data):
  182. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  183. pass
  184. def message2009(self, body_data):
  185. # --- 监听 2009
  186. """
  187. CSAdd: 消息体
  188. CSAdd.serial: string
  189. CSAdd.type: int32 EgoType::Car | 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  190. CSAdd.name: string
  191. """
  192. object0 = protobuf.CSAdd()
  193. object0.ParseFromString(body_data)
  194. methods.debug_log(f"{self.connection_id}|SRIConnection219", f"#serial: {object0.serial}")
  195. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  196. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  197. # --- update ---
  198. self.client_type = 'vehicle'
  199. self.client_info = {
  200. 'connection_id': self.connection_id,
  201. 'rid': serial_rid_dict.get(object0.serial),
  202. 'name': object0.name,
  203. 'serial': object0.serial,
  204. 'egotype': 2, # 客户端类型
  205. }
  206. """
  207. SCAddRobot: 消息体
  208. SCAddRobot.robot: Robot
  209. Robot: 消息体
  210. Robot.rid: int32
  211. Robot.name: string
  212. Robot.type: int32
  213. Robot.state: RobotState Offline Online Busy
  214. """
  215. o1 = protobuf.Robot()
  216. # o1.rid = self.client_info.get('rid')
  217. o1.rid = self.client_info.get('connection_id')
  218. o1.name = object0.name
  219. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  220. o1.state = protobuf.Robot.Online
  221. o2 = protobuf.SCAddRobot()
  222. o2.robot.CopyFrom(o1)
  223. re_command_id = protobuf.SC_NotifyAdd # 4016
  224. re_body_length = o2.ByteSize()
  225. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  226. re_body_data = o2.SerializeToString()
  227. re_send_data = re_head_data + re_body_data
  228. # --- send 4016 发送全部舱端
  229. for item in clients.values():
  230. if item.client_type == 'cockpit':
  231. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
  232. item.client.write(re_send_data)
  233. # --- send 4007 todo 凯强说并未用到
  234. # o1 = protobuf.SCAdd()
  235. # o1.ret = True
  236. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  237. # o1.name = object0.name
  238. # re_command_id = protobuf.SC_Add # 4007
  239. # re_body_length = o1.ByteSize()
  240. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  241. # re_body_data = o1.SerializeToString()
  242. # re_send_data = re_head_data + re_body_data
  243. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  244. # self.client.write(re_send_data)
  245. def message2000(self, body_data):
  246. # --- 解析消息体 2000
  247. object = protobuf.CSSign()
  248. object.ParseFromString(body_data)
  249. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  250. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  251. # --- update ---
  252. ret = True
  253. if object.account == "Ego" and object.password != '123456':
  254. ret = False
  255. # --- check ---
  256. user = Global.mdb.get_one('UserInfo', {'username': object.account})
  257. name = ''
  258. uuid = ''
  259. if not user:
  260. ret = False
  261. elif not check_password_hash(user.get('password'), object.password):
  262. ret = False
  263. elif user.get('state') and int(user.get('state')) == 1:
  264. ret = False
  265. else:
  266. name = user.get('name')
  267. uuid = str(user.get('_id'))
  268. ret = True
  269. # --- update ---
  270. if ret:
  271. self.client_type = 'cockpit'
  272. self.client_info = {
  273. 'connection_id': self.connection_id,
  274. 'uid': 3, # 对应数据库里的ego的id
  275. 'name': object.account, # 对应数据库里
  276. 'egotype': 1, # 舱端类型
  277. 'user_uuid': uuid, # 用户id
  278. }
  279. # --- send 4000
  280. object = protobuf.SCSign()
  281. object.ret = ret # 返回结果
  282. object.uid = self.connection_id
  283. object.name = name # 人员名称
  284. object.user_uuid = uuid # 人员唯一标识
  285. re_command_id = protobuf.SC_Sign # 4000
  286. re_body_length = object.ByteSize()
  287. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  288. re_body_data = object.SerializeToString()
  289. re_send_data = re_head_data + re_body_data
  290. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  291. self.client.write(re_send_data)
  292. def message2010(self, body_data):
  293. # --- send 4008 发送全部车端信息列表
  294. """
  295. Robot: 消息体
  296. Robot.rid: string
  297. Robot.name: string
  298. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  299. Robot.state: enum Offline Online Busy
  300. """
  301. # --- 获取全部在线车辆列表
  302. o2 = protobuf.SCRobot()
  303. for item in clients.values():
  304. if item.client_type and item.client_type == 'vehicle':
  305. o1 = protobuf.Robot()
  306. # o1.rid = item.client_info.get('rid')
  307. o1.rid = item.client_info.get('connection_id')
  308. o1.name = item.client_info.get('name')
  309. o1.type = 2 # EgoType::Car
  310. o1.state = protobuf.Robot.RobotState.Value('Online')
  311. o2.robot.add().CopyFrom(o1)
  312. re_command_id = protobuf.SC_Robot # 4008
  313. re_body_length = o2.ByteSize()
  314. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  315. re_body_data = o2.SerializeToString()
  316. re_send_data = re_head_data + re_body_data
  317. methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
  318. self.client.write(re_send_data)
  319. def message2001(self, body_data):
  320. # --- 解析消息体
  321. """
  322. CSReq: 消息体
  323. CSReq.peer: int32(rid,车端唯一标识)
  324. CSReq.index: int32(相机位置,RenderPosition)
  325. CSReq.egotype: int32(终端类型,舱端/车端)
  326. """
  327. o1 = protobuf.CSReq()
  328. o1.ParseFromString(body_data)
  329. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  330. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
  331. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  332. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  333. # --- send 4009 指定车端
  334. for item in clients.values():
  335. if item.client_info.get('connection_id') == o1.peer:
  336. """
  337. CSReq: 消息体
  338. CSReq.peer: int32(rid,车端唯一标识)
  339. CSReq.index: int32(相机位置,RenderPosition)
  340. CSReq.egotype: int32(终端类型,舱端/车端)
  341. """
  342. o2 = protobuf.CSReq()
  343. o2.peer = self.client_info.get('connection_id') # 舱端id
  344. o2.index = o1.index
  345. o2.egotype = o1.egotype
  346. re_command_id = protobuf.SC_NotifyReq # 4009
  347. re_body_length = o2.ByteSize()
  348. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  349. re_body_data = o2.SerializeToString()
  350. re_send_data = re_head_data + re_body_data
  351. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  352. item.client.write(re_send_data)
  353. def message2002(self, body_data):
  354. # --- 解析消息体
  355. """
  356. CSRep: 消息体
  357. CSRep.desc: VideoDesc
  358. CSRep.peer: int32
  359. CSRep.index: int32
  360. CSRep.egotype: int32
  361. """
  362. o1 = protobuf.CSRep()
  363. o1.ParseFromString(body_data)
  364. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  365. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  366. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  367. # --- send 4010 指定舱端
  368. for item in clients.values():
  369. if item.client_info.get('connection_id') == o1.peer:
  370. o2 = protobuf.CSRep()
  371. o2.desc = o1.desc
  372. o2.peer = self.client_info.get('connection_id') # 车端id
  373. o2.index = o1.index
  374. o2.egotype = o1.egotype
  375. re_command_id = protobuf.SC_NotifyRep # 4010
  376. re_body_length = o2.ByteSize()
  377. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  378. re_body_data = o2.SerializeToString()
  379. re_send_data = re_head_data + re_body_data
  380. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  381. item.client.write(re_send_data)
  382. def message2004(self, body_data):
  383. # --- 解析消息体
  384. """
  385. Offer: 消息体
  386. Offer.index: int32
  387. Offer.peer: int32(车端rid)
  388. Offer.type: string
  389. Offer.sdp: string
  390. """
  391. object = protobuf.Offer()
  392. object.ParseFromString(body_data)
  393. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  394. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  395. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  396. # --- send 4012 指定车端
  397. for item in clients.values():
  398. if item.client_info.get('connection_id') == object.peer:
  399. o1 = protobuf.Offer()
  400. o1.index = object.index
  401. o1.peer = self.client_info.get('connection_id') # 舱端id
  402. o1.type = object.type
  403. o1.sdp = object.sdp
  404. re_command_id = protobuf.SC_NotifyOffer # 4012
  405. re_body_length = o1.ByteSize()
  406. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  407. re_body_data = o1.SerializeToString()
  408. re_send_data = re_head_data + re_body_data
  409. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  410. item.client.write(re_send_data)
  411. def message2005(self, body_data):
  412. # --- 解析消息体
  413. """
  414. Answer: 消息体
  415. Answer.index: int32
  416. Answer.peer: int32(舱端uid)
  417. Answer.type: string
  418. Answer.sdp: string
  419. """
  420. object = protobuf.Answer()
  421. object.ParseFromString(body_data)
  422. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  423. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  424. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  425. # --- send 4011 指定舱端
  426. for item in clients.values():
  427. if item.client_info.get('connection_id') == object.peer:
  428. o1 = protobuf.Answer()
  429. o1.index = object.index
  430. o1.peer = self.client_info.get('connection_id') # 车端id
  431. o1.type = object.type
  432. o1.sdp = object.sdp
  433. re_command_id = protobuf.SC_NotifyAnswer # 4011
  434. re_body_length = o1.ByteSize()
  435. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  436. re_body_data = o1.SerializeToString()
  437. re_send_data = re_head_data + re_body_data
  438. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  439. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
  440. item.client.write(re_send_data)
  441. # --- get VehicleStatus ---
  442. unique_dict = {'name': 'VehicleStatus'}
  443. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  444. data = item.get('args', {})
  445. # --- set VehicleStatus ---
  446. data[str(self.connection_id)] = 4
  447. update_dict = {'args': data}
  448. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  449. def message2006(self, body_data):
  450. # --- 解析消息体
  451. """
  452. Candidate: 消息体
  453. Candidate.index: int32
  454. Candidate.peer: int32(车端rid)
  455. Candidate.type: string
  456. Candidate.candidate: string
  457. Candidate.sdpMLineIndex: int32
  458. Candidate.sdpMid: string
  459. Candidate.egotype: int32
  460. """
  461. object = protobuf.Candidate()
  462. object.ParseFromString(body_data)
  463. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  464. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  465. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  466. for item in clients.values():
  467. # --- send 4013 舱端到车端
  468. if item.client_info.get('connection_id') == object.peer:
  469. o1 = protobuf.Candidate()
  470. o1.index = object.index
  471. o1.peer = self.client_info.get('connection_id') # 舱端id,
  472. o1.type = object.type
  473. o1.candidate = object.candidate
  474. o1.sdpMLineIndex = object.sdpMLineIndex
  475. o1.sdpMid = object.sdpMid
  476. o1.egotype = object.egotype
  477. re_command_id = protobuf.SC_NotifyCandidate # 4013
  478. re_body_length = o1.ByteSize()
  479. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  480. re_body_data = o1.SerializeToString()
  481. re_send_data = re_head_data + re_body_data
  482. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  483. item.client.write(re_send_data)
  484. # --- send 4013 车端到舱端
  485. if item.client_info.get('connection_id') == object.peer:
  486. o1 = protobuf.Candidate()
  487. o1.index = object.index
  488. o1.peer = self.client_info.get('connection_id') # 车端id,
  489. o1.type = object.type
  490. o1.candidate = object.candidate
  491. o1.sdpMLineIndex = object.sdpMLineIndex
  492. o1.sdpMid = object.sdpMid
  493. o1.egotype = object.egotype
  494. re_command_id = protobuf.SC_NotifyCandidate # 4013
  495. re_body_length = o1.ByteSize()
  496. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  497. re_body_data = o1.SerializeToString()
  498. re_send_data = re_head_data + re_body_data
  499. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  500. # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  501. item.client.write(re_send_data)
  502. def message2007(self, body_data):
  503. # --- 解析消息体 2007
  504. """
  505. Leave: 消息体
  506. Leave.peer: int32(车端rid)
  507. Leave.egotype: int32
  508. """
  509. o1 = protobuf.Leave()
  510. o1.ParseFromString(body_data)
  511. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  512. # --- send 4009 指定车端
  513. for item in clients.values():
  514. if item.client_info.get('connection_id') == o1.peer:
  515. """
  516. Leave: 消息体
  517. Leave.peer: int32(车端rid)
  518. Leave.egotype: int32
  519. """
  520. o2 = protobuf.Leave()
  521. o2.peer = self.client_info.get('connection_id') # 舱端id
  522. o2.egotype = o1.egotype
  523. re_command_id = protobuf.SC_NotifyLeave # 4014
  524. re_body_length = o2.ByteSize()
  525. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  526. re_body_data = o2.SerializeToString()
  527. re_send_data = re_head_data + re_body_data
  528. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  529. item.client.write(re_send_data)
  530. def message2014(self, body_data):
  531. # --- 解析消息体 2014
  532. """
  533. message CSState
  534. {
  535. UserState state=1;
  536. int32 uid=2;
  537. };
  538. """
  539. o1 = protobuf.CSState()
  540. o1.ParseFromString(body_data)
  541. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#state: {o1.state}")
  542. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#uid: {o1.uid}")
  543. # --- send 4016 发送全部舱端
  544. for item in clients.values():
  545. if item.client_type == 'cockpit':
  546. """
  547. message SCState
  548. {
  549. UserState state=1;
  550. int32 uid=2;
  551. };
  552. """
  553. o2 = protobuf.SCState()
  554. o2.state = o1.state
  555. o2.uid = o1.uid # 车端id
  556. re_command_id = protobuf.SC_State # 4022
  557. re_body_length = o2.ByteSize()
  558. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  559. re_body_data = o2.SerializeToString()
  560. re_send_data = re_head_data + re_body_data
  561. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  562. item.client.write(re_send_data)
  563. # --- send 6011 指发送车端操作用户的id
  564. for item in clients.values():
  565. if item.client_info.get('connection_id') == o1.uid:
  566. """
  567. Leave: 消息体
  568. Leave.peer: int32(车端rid)
  569. Leave.egotype: int32
  570. """
  571. o3 = protobuf.UserActivityInfo()
  572. o3.user_uuid = self.client_info.get('user_uuid') # 用户uuid
  573. o3.cockpit_id = self.connection_id # 舱端id
  574. o3.vehicle_id = o1.uid # 车端id
  575. re_command_id = protobuf.S2V_SendUserInfo # 6011
  576. re_body_length = o3.ByteSize()
  577. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  578. re_body_data = o3.SerializeToString()
  579. re_send_data = re_head_data + re_body_data
  580. methods.debug_log(f"{self.connection_id}|SRIConnection611", f"re_command_id: {re_command_id}")
  581. item.client.write(re_send_data)
  582. @staticmethod
  583. async def check_clients():
  584. """
  585. 剔除掉线连接
  586. """
  587. count = 0
  588. while True:
  589. count += 1
  590. print(f"#count: {count}", flush=True)
  591. now_at = methods.now_ts()
  592. for connection_id in list(clients.keys()):
  593. object = clients.get(connection_id)
  594. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  595. flush=True)
  596. await asyncio.sleep(3) # 发送消息的间隔时间
  597. @staticmethod
  598. async def run():
  599. """
  600. """
  601. # --- define ---
  602. loop = asyncio.get_running_loop()
  603. server = await loop.create_server(
  604. lambda: SRIConnection(),
  605. '0.0.0.0', Global.egoserver_port
  606. )
  607. # --- start ---
  608. async with server:
  609. print(f"Server listening on 0.0.0.0:{Global.egoserver_port}", flush=True)
  610. # await loop.create_task(SRIConnection.check_clients())
  611. await server.serve_forever()
  612. if __name__ == '__main__':
  613. asyncio.run(SRIConnection.run())