Connection_e1.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. """
  2. """
  3. # from hub import methods, Global
  4. from werkzeug.security import check_password_hash
  5. import struct
  6. import asyncio
  7. import time
  8. import socket
  9. import sys
  10. import importlib
  11. # --- for linux
  12. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-server-bg03')
  13. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-pysdk')
  14. # --- for windows
  15. sys.path.append(r'C:\SRI-DINO.Server-py\sri-server-bg03')
  16. sys.path.append(r'C:\SRI-DINO.Server-py\sri-pysdk')
  17. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  18. methods = importlib.import_module(f"xlib")
  19. mdb = importlib.import_module(f"xclient.xmongo").Client(host='127.0.0.1', port=47017, database='bg',
  20. username='admin', password='admin')
  21. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  22. serial_rid_dict = {
  23. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  24. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  25. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  26. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  27. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  28. }
  29. account_uid_dict = {
  30. 'ego': 3
  31. }
  32. # live_relationship = {} # {<id-1>id-2>: True}
  33. class SRIConnection(asyncio.Protocol):
  34. """"""
  35. head_sequence = '<hh' # 字节序规则
  36. head_size = struct.calcsize(head_sequence)
  37. message_data = b''
  38. def __init__(self):
  39. self.lock = asyncio.Lock() # 初始化锁
  40. def connection_made(self, client):
  41. """
  42. 建立客户端连接
  43. """
  44. peername = client.get_extra_info('peername')
  45. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 客户端id
  46. # self.connection_id = int(f"{peername[1]}") # 客户端id(公网情况)
  47. self.connection_id = int(f"{peername[0].replace('.', '')}") # 客户端id(局域网情况)
  48. self.client = client
  49. # self.data = b''
  50. self.client_type = None
  51. self.client_info = None
  52. self.update_at = methods.now_ts()
  53. clients[self.connection_id] = self
  54. # 获取底层 socket
  55. # sock = self.client.get_extra_info('socket')
  56. # 查看缓冲区大小
  57. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  58. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  59. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  60. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  61. def connection_lost(self, exc):
  62. """
  63. 关闭连接
  64. """
  65. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
  66. # --- 处理车端掉线,通知所有舱端
  67. if self.client_type == 'vehicle':
  68. """
  69. SCDelRobot: 消息体
  70. SCDelRobot.peer: int32
  71. SCDelRobot.egotype: int32
  72. """
  73. o2 = protobuf.SCDelRobot()
  74. o2.peer = self.client_info.get('connection_id') # 车端id
  75. o2.egotype = self.client_info.get('egotype')
  76. re_command_id = protobuf.SC_NotifyDel # 4017
  77. re_body_length = o2.ByteSize()
  78. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  79. re_body_data = o2.SerializeToString()
  80. re_send_data = re_head_data + re_body_data
  81. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
  82. for item in clients.values():
  83. if item.client_type == 'cockpit':
  84. item.client.write(re_send_data)
  85. # --- 处理舱端掉线,通知所有车端
  86. if self.client_type == 'cockpit':
  87. """
  88. Leave: 消息体
  89. Leave.peer: int32(车端rid)
  90. Leave.egotype: int32
  91. """
  92. o2 = protobuf.Leave()
  93. o2.peer = self.client_info.get('connection_id') # 舱端id
  94. o2.egotype = 1 # 客户端类型
  95. re_command_id = protobuf.SC_NotifyLeave # 4014
  96. re_body_length = o2.ByteSize()
  97. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  98. re_body_data = o2.SerializeToString()
  99. re_send_data = re_head_data + re_body_data
  100. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  101. for item in clients.values():
  102. if item.client_type == 'vehicle':
  103. item.client.write(re_send_data)
  104. # --- clean
  105. if self.connection_id in clients:
  106. del clients[self.connection_id]
  107. # --- clean live_relationship
  108. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  109. # for key in keys:
  110. # del live_relationship[key]
  111. def data_received(self, data):
  112. """
  113. 消息处理
  114. """
  115. asyncio.create_task(self.handle_data(data))
  116. async def handle_data(self, data):
  117. """
  118. 消息处理
  119. """
  120. # --- before
  121. # loop = asyncio.get_running_loop()
  122. # await loop.run_in_executor(None, self.process_data, data)
  123. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  124. # await self.process_data(data)
  125. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  126. loop = asyncio.get_running_loop()
  127. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  128. def process_data(self, data):
  129. """
  130. 消息处理
  131. """
  132. try:
  133. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  134. self.message_data += data # 执行了这个以后
  135. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  136. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  137. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  138. # count = 0
  139. while True:
  140. # --- print
  141. # count += 1
  142. # methods.debug_log(f"{self.connection_id}|SRIConnection123",
  143. # f"---------------------------------- while count: {count}")
  144. # 确保有足够的字节来处理消息头
  145. if len(self.message_data) < self.head_size:
  146. break
  147. # 获取消息头
  148. head_data = self.message_data[:self.head_size]
  149. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  150. if command_id not in [2008]:
  151. methods.debug_log(f'{self.connection_id}|SRIConnection176',
  152. f"command_id: {command_id}, body_length: {body_length}")
  153. # 检查命令ID是否有效
  154. if not (1000 < command_id < 9000):
  155. self.message_data = b'' # 清空无效数据
  156. break
  157. # 检查完整消息是否接收完毕
  158. total_length = self.head_size + body_length
  159. if len(self.message_data) < total_length:
  160. break # 数据不完整,等待更多数据
  161. # 调用相应处理方法
  162. method = getattr(self, f'message{command_id}', None)
  163. if method:
  164. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  165. else:
  166. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  167. f"No handler for command_id: {command_id}")
  168. # 移除已处理的消息
  169. self.message_data = self.message_data[total_length:]
  170. except Exception as exception:
  171. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  172. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  173. def message2008(self, body_data):
  174. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  175. pass
  176. def message2009(self, body_data):
  177. # --- 监听 2009
  178. """
  179. CSAdd: 消息体
  180. CSAdd.serial: string
  181. CSAdd.type: int32 EgoType::Car | 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  182. CSAdd.name: string
  183. """
  184. object0 = protobuf.CSAdd()
  185. object0.ParseFromString(body_data)
  186. methods.debug_log(f"{self.connection_id}|SRIConnection219", f"#serial: {object0.serial}")
  187. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  188. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  189. # --- update ---
  190. self.client_type = 'vehicle'
  191. self.client_info = {
  192. 'connection_id': self.connection_id,
  193. 'rid': serial_rid_dict.get(object0.serial),
  194. 'name': object0.name,
  195. 'serial': object0.serial,
  196. 'egotype': 2, # 客户端类型
  197. }
  198. """
  199. SCAddRobot: 消息体
  200. SCAddRobot.robot: Robot
  201. Robot: 消息体
  202. Robot.rid: int32
  203. Robot.name: string
  204. Robot.type: int32
  205. Robot.state: RobotState Offline Online Busy
  206. """
  207. o1 = protobuf.Robot()
  208. # o1.rid = self.client_info.get('rid')
  209. o1.rid = self.client_info.get('connection_id')
  210. o1.name = object0.name
  211. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  212. o1.state = protobuf.Robot.Online
  213. o2 = protobuf.SCAddRobot()
  214. o2.robot.CopyFrom(o1)
  215. re_command_id = protobuf.SC_NotifyAdd # 4016
  216. re_body_length = o2.ByteSize()
  217. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  218. re_body_data = o2.SerializeToString()
  219. re_send_data = re_head_data + re_body_data
  220. # --- send 4016 发送全部舱端
  221. for item in clients.values():
  222. if item.client_type == 'cockpit':
  223. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
  224. item.client.write(re_send_data)
  225. # --- send 4007 todo 凯强说并未用到
  226. # o1 = protobuf.SCAdd()
  227. # o1.ret = True
  228. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  229. # o1.name = object0.name
  230. # re_command_id = protobuf.SC_Add # 4007
  231. # re_body_length = o1.ByteSize()
  232. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  233. # re_body_data = o1.SerializeToString()
  234. # re_send_data = re_head_data + re_body_data
  235. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  236. # self.client.write(re_send_data)
  237. def message2000(self, body_data):
  238. # --- 解析消息体 2000
  239. object = protobuf.CSSign()
  240. object.ParseFromString(body_data)
  241. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  242. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  243. # --- update ---
  244. ret = True
  245. if object.account == "Ego" and object.password != '123456':
  246. ret = False
  247. # --- check ---
  248. user = mdb.get_one('UserInfo', {'username': object.account})
  249. name = ''
  250. uuid = ''
  251. if not user:
  252. ret = False
  253. elif not check_password_hash(user.get('password'), object.password):
  254. ret = False
  255. elif int(user.get('state')) == 1:
  256. ret = False
  257. else:
  258. name = user.get('name')
  259. uuid = str(user.get('_id'))
  260. ret = True
  261. # --- update ---
  262. if ret:
  263. self.client_type = 'cockpit'
  264. self.client_info = {
  265. 'connection_id': self.connection_id,
  266. 'uid': 3, # 对应数据库里的ego的id
  267. 'name': object.account, # 对应数据库里
  268. 'egotype': 1, # 舱端类型
  269. 'user_uuid': uuid, # 用户id
  270. }
  271. # --- send 4000
  272. object = protobuf.SCSign()
  273. object.ret = ret # 返回结果
  274. object.uid = self.connection_id
  275. object.name = name # 人员名称
  276. object.user_uuid = uuid # 人员唯一标识
  277. re_command_id = protobuf.SC_Sign # 4000
  278. re_body_length = object.ByteSize()
  279. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  280. re_body_data = object.SerializeToString()
  281. re_send_data = re_head_data + re_body_data
  282. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  283. self.client.write(re_send_data)
  284. def message2010(self, body_data):
  285. # --- send 4008 发送全部车端信息列表
  286. """
  287. Robot: 消息体
  288. Robot.rid: string
  289. Robot.name: string
  290. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  291. Robot.state: enum Offline Online Busy
  292. """
  293. # --- 获取全部在线车辆列表
  294. o2 = protobuf.SCRobot()
  295. for item in clients.values():
  296. if item.client_type and item.client_type == 'vehicle':
  297. o1 = protobuf.Robot()
  298. # o1.rid = item.client_info.get('rid')
  299. o1.rid = item.client_info.get('connection_id')
  300. o1.name = item.client_info.get('name')
  301. o1.type = 2 # EgoType::Car
  302. o1.state = protobuf.Robot.RobotState.Value('Online')
  303. o2.robot.add().CopyFrom(o1)
  304. re_command_id = protobuf.SC_Robot # 4008
  305. re_body_length = o2.ByteSize()
  306. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  307. re_body_data = o2.SerializeToString()
  308. re_send_data = re_head_data + re_body_data
  309. methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
  310. self.client.write(re_send_data)
  311. def message2001(self, body_data):
  312. # --- 解析消息体
  313. """
  314. CSReq: 消息体
  315. CSReq.peer: int32(rid,车端唯一标识)
  316. CSReq.index: int32(相机位置,RenderPosition)
  317. CSReq.egotype: int32(终端类型,舱端/车端)
  318. """
  319. o1 = protobuf.CSReq()
  320. o1.ParseFromString(body_data)
  321. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  322. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
  323. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  324. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  325. # --- send 4009 指定车端
  326. for item in clients.values():
  327. if item.client_info.get('connection_id') == o1.peer:
  328. """
  329. CSReq: 消息体
  330. CSReq.peer: int32(rid,车端唯一标识)
  331. CSReq.index: int32(相机位置,RenderPosition)
  332. CSReq.egotype: int32(终端类型,舱端/车端)
  333. """
  334. o2 = protobuf.CSReq()
  335. o2.peer = self.client_info.get('connection_id') # 舱端id
  336. o2.index = o1.index
  337. o2.egotype = o1.egotype
  338. re_command_id = protobuf.SC_NotifyReq # 4009
  339. re_body_length = o2.ByteSize()
  340. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  341. re_body_data = o2.SerializeToString()
  342. re_send_data = re_head_data + re_body_data
  343. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  344. item.client.write(re_send_data)
  345. def message2002(self, body_data):
  346. # --- 解析消息体
  347. """
  348. CSRep: 消息体
  349. CSRep.desc: VideoDesc
  350. CSRep.peer: int32
  351. CSRep.index: int32
  352. CSRep.egotype: int32
  353. """
  354. o1 = protobuf.CSRep()
  355. o1.ParseFromString(body_data)
  356. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  357. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  358. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  359. # --- send 4010 指定舱端
  360. for item in clients.values():
  361. if item.client_info.get('connection_id') == o1.peer:
  362. o2 = protobuf.CSRep()
  363. o2.desc = o1.desc
  364. o2.peer = self.client_info.get('connection_id') # 车端id
  365. o2.index = o1.index
  366. o2.egotype = o1.egotype
  367. re_command_id = protobuf.SC_NotifyRep # 4010
  368. re_body_length = o2.ByteSize()
  369. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  370. re_body_data = o2.SerializeToString()
  371. re_send_data = re_head_data + re_body_data
  372. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  373. item.client.write(re_send_data)
  374. def message2004(self, body_data):
  375. # --- 解析消息体
  376. """
  377. Offer: 消息体
  378. Offer.index: int32
  379. Offer.peer: int32(车端rid)
  380. Offer.type: string
  381. Offer.sdp: string
  382. """
  383. object = protobuf.Offer()
  384. object.ParseFromString(body_data)
  385. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  386. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  387. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  388. # --- send 4012 指定车端
  389. for item in clients.values():
  390. if item.client_info.get('connection_id') == object.peer:
  391. o1 = protobuf.Offer()
  392. o1.index = object.index
  393. o1.peer = self.client_info.get('connection_id') # 舱端id
  394. o1.type = object.type
  395. o1.sdp = object.sdp
  396. re_command_id = protobuf.SC_NotifyOffer # 4012
  397. re_body_length = o1.ByteSize()
  398. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  399. re_body_data = o1.SerializeToString()
  400. re_send_data = re_head_data + re_body_data
  401. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  402. item.client.write(re_send_data)
  403. def message2005(self, body_data):
  404. # --- 解析消息体
  405. """
  406. Answer: 消息体
  407. Answer.index: int32
  408. Answer.peer: int32(舱端uid)
  409. Answer.type: string
  410. Answer.sdp: string
  411. """
  412. object = protobuf.Answer()
  413. object.ParseFromString(body_data)
  414. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  415. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  416. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  417. # --- send 4011 指定舱端
  418. for item in clients.values():
  419. if item.client_info.get('connection_id') == object.peer:
  420. o1 = protobuf.Answer()
  421. o1.index = object.index
  422. o1.peer = self.client_info.get('connection_id') # 车端id
  423. o1.type = object.type
  424. o1.sdp = object.sdp
  425. re_command_id = protobuf.SC_NotifyAnswer # 4011
  426. re_body_length = o1.ByteSize()
  427. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  428. re_body_data = o1.SerializeToString()
  429. re_send_data = re_head_data + re_body_data
  430. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  431. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
  432. item.client.write(re_send_data)
  433. def message2006(self, body_data):
  434. # --- 解析消息体
  435. """
  436. Candidate: 消息体
  437. Candidate.index: int32
  438. Candidate.peer: int32(车端rid)
  439. Candidate.type: string
  440. Candidate.candidate: string
  441. Candidate.sdpMLineIndex: int32
  442. Candidate.sdpMid: string
  443. Candidate.egotype: int32
  444. """
  445. object = protobuf.Candidate()
  446. object.ParseFromString(body_data)
  447. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  448. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  449. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  450. for item in clients.values():
  451. # --- send 4013 舱端到车端
  452. if item.client_info.get('connection_id') == object.peer:
  453. o1 = protobuf.Candidate()
  454. o1.index = object.index
  455. o1.peer = self.client_info.get('connection_id') # 舱端id,
  456. o1.type = object.type
  457. o1.candidate = object.candidate
  458. o1.sdpMLineIndex = object.sdpMLineIndex
  459. o1.sdpMid = object.sdpMid
  460. o1.egotype = object.egotype
  461. re_command_id = protobuf.SC_NotifyCandidate # 4013
  462. re_body_length = o1.ByteSize()
  463. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  464. re_body_data = o1.SerializeToString()
  465. re_send_data = re_head_data + re_body_data
  466. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  467. item.client.write(re_send_data)
  468. # --- send 4013 车端到舱端
  469. if item.client_info.get('connection_id') == object.peer:
  470. o1 = protobuf.Candidate()
  471. o1.index = object.index
  472. o1.peer = self.client_info.get('connection_id') # 车端id,
  473. o1.type = object.type
  474. o1.candidate = object.candidate
  475. o1.sdpMLineIndex = object.sdpMLineIndex
  476. o1.sdpMid = object.sdpMid
  477. o1.egotype = object.egotype
  478. re_command_id = protobuf.SC_NotifyCandidate # 4013
  479. re_body_length = o1.ByteSize()
  480. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  481. re_body_data = o1.SerializeToString()
  482. re_send_data = re_head_data + re_body_data
  483. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  484. # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  485. item.client.write(re_send_data)
  486. def message2007(self, body_data):
  487. # --- 解析消息体
  488. """
  489. Leave: 消息体
  490. Leave.peer: int32(车端rid)
  491. Leave.egotype: int32
  492. """
  493. o1 = protobuf.Leave()
  494. o1.ParseFromString(body_data)
  495. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  496. # --- send 4009 指定车端
  497. for item in clients.values():
  498. if item.client_info.get('connection_id') == o1.peer:
  499. """
  500. Leave: 消息体
  501. Leave.peer: int32(车端rid)
  502. Leave.egotype: int32
  503. """
  504. o2 = protobuf.Leave()
  505. o2.peer = self.client_info.get('connection_id') # 舱端id
  506. o2.egotype = o1.egotype
  507. re_command_id = protobuf.SC_NotifyLeave # 4014
  508. re_body_length = o2.ByteSize()
  509. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  510. re_body_data = o2.SerializeToString()
  511. re_send_data = re_head_data + re_body_data
  512. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  513. item.client.write(re_send_data)
  514. def message2014(self, body_data):
  515. # --- 解析消息体 2014
  516. """
  517. message CSState
  518. {
  519. UserState state=1;
  520. int32 uid=2;
  521. };
  522. """
  523. o1 = protobuf.CSState()
  524. o1.ParseFromString(body_data)
  525. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#state: {o1.state}")
  526. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#uid: {o1.uid}")
  527. # --- send 4016 发送全部舱端
  528. for item in clients.values():
  529. if item.client_type == 'cockpit':
  530. """
  531. message SCState
  532. {
  533. UserState state=1;
  534. int32 uid=2;
  535. };
  536. """
  537. o2 = protobuf.SCState()
  538. o2.state = o1.state
  539. o2.uid = o1.uid # 车端id
  540. re_command_id = protobuf.SC_State # 4022
  541. re_body_length = o2.ByteSize()
  542. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  543. re_body_data = o2.SerializeToString()
  544. re_send_data = re_head_data + re_body_data
  545. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  546. item.client.write(re_send_data)
  547. # --- send 6011 指发送车端操作用户的id
  548. for item in clients.values():
  549. if item.client_info.get('connection_id') == o1.uid:
  550. """
  551. Leave: 消息体
  552. Leave.peer: int32(车端rid)
  553. Leave.egotype: int32
  554. """
  555. o3 = protobuf.UserActivityInfo()
  556. o3.user_uuid = self.client_info.get('user_uuid') # 用户uuid
  557. o3.cockpit_id = self.connection_id # 舱端id
  558. o3.vehicle_id = o1.uid # 车端id
  559. re_command_id = protobuf.S2V_SendUserInfo # 6011
  560. re_body_length = o3.ByteSize()
  561. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  562. re_body_data = o3.SerializeToString()
  563. re_send_data = re_head_data + re_body_data
  564. methods.debug_log(f"{self.connection_id}|SRIConnection611", f"re_command_id: {re_command_id}")
  565. item.client.write(re_send_data)
  566. @staticmethod
  567. async def check_clients():
  568. """
  569. 剔除掉线连接
  570. """
  571. count = 0
  572. while True:
  573. count += 1
  574. print(f"#count: {count}", flush=True)
  575. now_at = methods.now_ts()
  576. for connection_id in list(clients.keys()):
  577. object = clients.get(connection_id)
  578. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  579. flush=True)
  580. await asyncio.sleep(3) # 发送消息的间隔时间
  581. @staticmethod
  582. async def run():
  583. """
  584. """
  585. # --- define ---
  586. loop = asyncio.get_running_loop()
  587. server = await loop.create_server(
  588. lambda: SRIConnection(),
  589. '0.0.0.0', 20917
  590. )
  591. # --- start ---
  592. async with server:
  593. print("Server listening on 0.0.0.0:20917", flush=True)
  594. # await loop.create_task(SRIConnection.check_clients())
  595. await server.serve_forever()
  596. if __name__ == '__main__':
  597. asyncio.run(SRIConnection.run())