Connection_e1.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. """
  2. """
  3. from hub import methods, Global, protobuf
  4. from werkzeug.security import check_password_hash
  5. import struct
  6. import asyncio
  7. import time
  8. import socket
  9. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  10. serial_rid_dict = {
  11. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  12. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  13. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  14. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  15. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  16. }
  17. account_uid_dict = {
  18. 'ego': 3
  19. }
  20. # live_relationship = {} # {<id-1>id-2>: True}
  21. class SRIConnection(asyncio.Protocol):
  22. """"""
  23. head_sequence = '<hh' # <: 小端字节序 h: 短整型/short/2字节
  24. head_size = struct.calcsize(head_sequence)
  25. message_data = b''
  26. def __init__(self):
  27. self.lock = asyncio.Lock() # 初始化锁
  28. def connection_made(self, client):
  29. """
  30. 建立客户端连接
  31. """
  32. peername = client.get_extra_info('peername')
  33. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 客户端id
  34. # self.connection_id = int(f"{peername[1]}") # 客户端id(公网情况)
  35. self.connection_id = int(f"{peername[0].replace('.', '')}") # 客户端id(局域网情况)
  36. self.client = client
  37. # self.data = b''
  38. self.client_type = '' # 连接类型 vehicle 车端 cockpit 舱端
  39. self.client_info = None
  40. self.client_info = {
  41. 'local_connection_id': 0, # 本端id
  42. 'remote_connection_id': 0, # 对端id
  43. } # 连接信息
  44. self.update_at = methods.now_ts()
  45. clients[self.connection_id] = self
  46. # --- get VehicleStatus ---
  47. unique_dict = {'name': 'VehicleStatus'}
  48. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  49. data = item.get('args', {})
  50. # --- set VehicleStatus ---
  51. data[str(self.connection_id)] = 2 # 车辆状态 1 离线 2 在线空闲 3 现场驾驶中 4 远程驾驶中
  52. update_dict = {'args': data}
  53. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  54. # 获取底层 socket
  55. # sock = self.client.get_extra_info('socket')
  56. # 查看缓冲区大小
  57. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  58. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  59. # methods.debug_log(f'{self.connection_id}|Connection_e1:70', f"Receive buffer size: {recv_buffer_size}")
  60. # methods.debug_log(f'{self.connection_id}|Connection_e1:70', f"Send buffer size: {send_buffer_size}")
  61. def connection_lost(self, exc):
  62. """
  63. 关闭连接
  64. """
  65. methods.debug_log(f"{self.connection_id}|Connection_e1:65", f"连接已关闭")
  66. methods.debug_log(f"{self.connection_id}|Connection_e1:65", f"clients: len{clients.keys()}")
  67. # --- get VehicleStatus ---
  68. unique_dict = {'name': 'VehicleStatus'}
  69. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  70. data = item.get('args', {})
  71. # --- set VehicleStatus ---
  72. data[str(self.connection_id)] = 1 # 车辆状态 1 离线 2 在线空闲 3 现场驾驶中 4 远程驾驶中
  73. update_dict = {'args': data}
  74. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  75. # --- 处理车端掉线,通知所有舱端
  76. if self.client_type == 'vehicle':
  77. """
  78. SCDelRobot: 消息体
  79. SCDelRobot.peer: int32
  80. SCDelRobot.egotype: int32
  81. """
  82. o2 = protobuf.SCDelRobot()
  83. o2.peer = self.client_info.get('local_connection_id') # 车端id
  84. o2.egotype = self.client_info.get('egotype')
  85. re_command_id = protobuf.SC_NotifyDel # 4017
  86. re_body_length = o2.ByteSize()
  87. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  88. re_body_data = o2.SerializeToString()
  89. re_send_data = re_head_data + re_body_data
  90. methods.debug_log(f"{self.connection_id}|Connection_e1:102", f"re_command_id: {re_command_id}")
  91. for item in clients.values():
  92. if item.client_type == 'cockpit':
  93. item.client.write(re_send_data)
  94. # --- 处理某个舱端掉线,通知指定车端,如果是连接的话
  95. if self.client_type == 'cockpit':
  96. """
  97. Leave: 消息体
  98. Leave.peer: int32(车端rid)
  99. Leave.egotype: int32
  100. """
  101. o2 = protobuf.Leave()
  102. o2.peer = self.client_info.get('local_connection_id') # 舱端id
  103. o2.egotype = 1 # 客户端类型
  104. re_command_id = protobuf.SC_NotifyLeave # 4014
  105. re_body_length = o2.ByteSize()
  106. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  107. re_body_data = o2.SerializeToString()
  108. re_send_data = re_head_data + re_body_data
  109. methods.debug_log(f"{self.connection_id}|Connection_e1:515", f"re_command_id: {re_command_id}") # 4014
  110. # --- 筛选指定车端
  111. for item in clients.values():
  112. if item.client_type != 'vehicle':
  113. continue
  114. # 判断当前id是否是需要被断开的id
  115. if self.client_info.get('local_connection_id') != item.client_info.get('remote_connection_id'):
  116. continue
  117. # 发送断开信号给车端
  118. item.client.write(re_send_data)
  119. # --- clean
  120. if self.connection_id in clients:
  121. del clients[self.connection_id]
  122. # --- clean live_relationship
  123. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  124. # for key in keys:
  125. # del live_relationship[key]
  126. def data_received(self, data):
  127. """
  128. 消息处理
  129. """
  130. asyncio.create_task(self.handle_data(data))
  131. async def handle_data(self, data):
  132. """
  133. 消息处理
  134. """
  135. # --- before
  136. # loop = asyncio.get_running_loop()
  137. # await loop.run_in_executor(None, self.process_data, data)
  138. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  139. # await self.process_data(data)
  140. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  141. loop = asyncio.get_running_loop()
  142. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  143. def process_data(self, data):
  144. """
  145. 消息处理
  146. """
  147. try:
  148. # methods.debug_log(f"{self.connection_id}|Connection_e1:114", f"receive: {len(data)}, message: {repr(data)}")
  149. self.message_data += data # 执行了这个以后
  150. # methods.debug_log(f"{self.connection_id}|Connection_e1:114", f"--- 1")
  151. # methods.debug_log(f"{self.connection_id}|Connection_e1:114", f"--- 2")
  152. # methods.debug_log(f"{self.connection_id}|Connection_e1:114", f"--- 3")
  153. # count = 0
  154. while True:
  155. # --- print
  156. # count += 1
  157. # methods.debug_log(f"{self.connection_id}|Connection_e1:123",
  158. # f"---------------------------------- while count: {count}")
  159. # 确保有足够的字节来处理消息头
  160. if len(self.message_data) < self.head_size:
  161. break
  162. # 获取消息头
  163. head_data = self.message_data[:self.head_size]
  164. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  165. if command_id not in [2008]:
  166. methods.debug_log(f'{self.connection_id}|Connection_e1:166',
  167. f"command_id: {command_id}, body_length: {body_length}")
  168. # else:
  169. # methods.debug_log(f'{self.connection_id}|Connection_e1:.169',
  170. # f"command_id: {command_id}, body_length: {body_length}")
  171. # 检查命令ID是否有效
  172. if not (1000 < command_id < 9000):
  173. self.message_data = b'' # 清空无效数据
  174. break
  175. # 检查完整消息是否接收完毕
  176. total_length = self.head_size + body_length
  177. if len(self.message_data) < total_length:
  178. break # 数据不完整,等待更多数据
  179. # 调用相应处理方法
  180. method = getattr(self, f'message{command_id}', None)
  181. if method:
  182. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  183. else:
  184. methods.debug_log(f"{self.connection_id}|Connection_e1:211",
  185. f"error212: 没有找到'{command_id}'对应的处理方法")
  186. # 移除已处理的消息
  187. self.message_data = self.message_data[total_length:]
  188. except Exception as exception:
  189. methods.debug_log(f"{self.connection_id}|Connection_e1:132", f"#exception: {exception}")
  190. methods.debug_log(f"{self.connection_id}|Connection_e1:123", f"#traceback: {methods.trace_log()}")
  191. def message2008(self, body_data):
  192. # methods.debug_log(f"{self.connection_id}|Connection_e1:80", f"message: 2008")
  193. pass
  194. def message2009(self, body_data):
  195. """
  196. 车端上线
  197. """
  198. # --- 监听 2009
  199. """
  200. CSAdd: 消息体
  201. CSAdd.serial: string
  202. CSAdd.type: int32 EgoType::Car | 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  203. CSAdd.name: string
  204. """
  205. object0 = protobuf.CSAdd()
  206. object0.ParseFromString(body_data)
  207. methods.debug_log(f"{self.connection_id}|Connection_e1:219", f"serial: {object0.serial}")
  208. # methods.debug_log(f"{self.connection_id}|Connection_e1:90", f"#name: {object0.name}")
  209. # methods.debug_log(f"{self.connection_id}|Connection_e1:90", f"#type: {object0.type}")
  210. # --- update ---
  211. self.client_type = 'vehicle'
  212. self.client_info = {
  213. 'local_connection_id': self.connection_id,
  214. 'rid': serial_rid_dict.get(object0.serial),
  215. 'name': object0.name,
  216. 'serial': object0.serial,
  217. 'egotype': 2, # 客户端类型 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  218. }
  219. methods.debug_log(f"{self.connection_id}|Connection_e1:259", f"client_info: {self.client_info}")
  220. """
  221. SCAddRobot: 消息体
  222. SCAddRobot.robot: Robot
  223. Robot: 消息体
  224. Robot.rid: int32
  225. Robot.name: string
  226. Robot.type: int32
  227. Robot.state: RobotState Offline Online Busy
  228. """
  229. o1 = protobuf.Robot()
  230. # o1.rid = self.client_info.get('rid')
  231. o1.rid = self.client_info.get('local_connection_id')
  232. o1.name = object0.name
  233. o1.type = 2 # 客户端类型 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  234. o1.state = protobuf.Robot.Online
  235. o2 = protobuf.SCAddRobot()
  236. o2.robot.CopyFrom(o1)
  237. re_command_id = protobuf.SC_NotifyAdd # 4016
  238. re_body_length = o2.ByteSize()
  239. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  240. re_body_data = o2.SerializeToString()
  241. re_send_data = re_head_data + re_body_data
  242. # --- send 4016 发送全部舱端
  243. for item in clients.values():
  244. if item.client_type == 'cockpit':
  245. methods.debug_log(f"{self.connection_id}|Connection_e1:136", f"re_command_id: {re_command_id}")
  246. item.client.write(re_send_data)
  247. # --- send 4007 todo 凯强说并未用到
  248. # o1 = protobuf.SCAdd()
  249. # o1.ret = True
  250. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  251. # o1.name = object0.name
  252. # re_command_id = protobuf.SC_Add # 4007
  253. # re_body_length = o1.ByteSize()
  254. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  255. # re_body_data = o1.SerializeToString()
  256. # re_send_data = re_head_data + re_body_data
  257. # methods.debug_log(f"{self.connection_id} | Connection_e1:150", f"re_command_id: {re_command_id}")
  258. # self.client.write(re_send_data)
  259. def message2000(self, body_data):
  260. # --- 监听 2000
  261. object = protobuf.CSSign()
  262. object.ParseFromString(body_data)
  263. methods.debug_log(f"{self.connection_id}|Connection_e1:293", f"account: {object.account}")
  264. methods.debug_log(f"{self.connection_id}|Connection_e1:293", f"password: {object.password}")
  265. # --- check ---
  266. ret = True
  267. if object.account == "Ego" and object.password != '123456':
  268. ret = False
  269. # --- check ---
  270. ret = True
  271. name = ''
  272. uuid = ''
  273. if object.account in ['Ego', 'test003']:
  274. if object.password not in ['123456']:
  275. ret = False
  276. else:
  277. user = Global.mdb.get_one('UserInfo', {'username': object.account})
  278. if not user:
  279. ret = False
  280. elif not check_password_hash(user.get('password'), object.password):
  281. ret = False
  282. elif user.get('state') and int(user.get('state')) == 1:
  283. ret = False
  284. else:
  285. name = user.get('name')
  286. uuid = str(user.get('_id'))
  287. ret = True
  288. methods.debug_log(f"{self.connection_id}|Connection_e1:293", f"用户'{object.account}'登录认证结果: {ret}")
  289. # --- update ---
  290. if ret:
  291. self.client_type = 'cockpit'
  292. self.client_info = {
  293. 'local_connection_id': self.connection_id, # 连接id
  294. 'uid': 3, # 对应数据库里的ego的id
  295. 'name': object.account, # 对应数据库里
  296. 'egotype': 1, # 舱端类型
  297. 'user_uuid': uuid, # 用户id
  298. }
  299. # --- 发送 4000
  300. object = protobuf.SCSign()
  301. object.ret = ret # 返回结果
  302. object.uid = self.connection_id
  303. object.name = name # 人员名称
  304. object.user_uuid = uuid # 人员唯一标识
  305. re_command_id = protobuf.SC_Sign # 4000
  306. re_body_length = object.ByteSize()
  307. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  308. re_body_data = object.SerializeToString()
  309. re_send_data = re_head_data + re_body_data
  310. methods.debug_log(f"{self.connection_id}|Connection_e1:352", f"re_command_id: {re_command_id}")
  311. self.client.write(re_send_data)
  312. def message2010(self, body_data):
  313. # --- 监听 2010 发送 4008 发送全部车端信息列表
  314. """
  315. Robot: 消息体
  316. Robot.rid: string
  317. Robot.name: string
  318. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  319. Robot.state: enum Offline Online Busy
  320. """
  321. # --- 获取全部在线车辆列表
  322. o2 = protobuf.SCRobot()
  323. for item in clients.values():
  324. if item.client_type and item.client_type == 'vehicle':
  325. o1 = protobuf.Robot()
  326. # o1.rid = item.client_info.get('rid')
  327. o1.rid = item.client_info.get('local_connection_id')
  328. o1.name = item.client_info.get('name')
  329. o1.type = 2 # 客户端类型 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  330. o1.state = protobuf.Robot.RobotState.Value('Online')
  331. o2.robot.add().CopyFrom(o1)
  332. # --- 发送 4008
  333. re_command_id = protobuf.SC_Robot
  334. re_body_length = o2.ByteSize()
  335. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  336. re_body_data = o2.SerializeToString()
  337. re_send_data = re_head_data + re_body_data
  338. methods.debug_log(f"{self.connection_id}|Connection_e1:306", f"re_command_id: {re_command_id}")
  339. self.client.write(re_send_data)
  340. def message2001(self, body_data):
  341. # --- 监听 2001
  342. """
  343. CSReq: 消息体
  344. CSReq.peer: int32(rid,车端唯一标识)
  345. CSReq.index: int32(相机位置,RenderPosition)
  346. CSReq.egotype: int32(终端类型,舱端/车端)
  347. """
  348. o1 = protobuf.CSReq()
  349. o1.ParseFromString(body_data)
  350. methods.debug_log(f"{self.connection_id}|Connection_e1:299.message2001", f"#peer: {o1.peer}")
  351. methods.debug_log(f"{self.connection_id}|Connection_e1:299.message2001", f"#index: {o1.index}")
  352. # methods.debug_log(f"{self.connection_id}|Connection_e1:299.message2001", f"#uid: {self.client_info.get('uid')}")
  353. # methods.debug_log(f"{self.connection_id}|Connection_e1:299.message2001", f"#rid: {self.client_info.get('rid')}")
  354. # --- send 4009 指定车端
  355. for item in clients.values():
  356. if item.client_info.get('local_connection_id') == o1.peer:
  357. """
  358. CSReq: 消息体
  359. CSReq.peer: int32(rid,车端唯一标识)
  360. CSReq.index: int32(相机位置,RenderPosition)
  361. CSReq.egotype: int32(终端类型,舱端/车端)
  362. """
  363. o2 = protobuf.CSReq()
  364. o2.peer = self.client_info.get('local_connection_id') # 舱端id
  365. o2.index = o1.index
  366. o2.egotype = o1.egotype
  367. re_command_id = protobuf.SC_NotifyReq # 4009
  368. re_body_length = o2.ByteSize()
  369. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  370. re_body_data = o2.SerializeToString()
  371. re_send_data = re_head_data + re_body_data
  372. methods.debug_log(f"{self.connection_id}|Connection_e1:217", f"re_command_id: {re_command_id}")
  373. item.client.write(re_send_data)
  374. def message2002(self, body_data):
  375. # --- 监听 2002
  376. """
  377. CSRep: 消息体
  378. CSRep.desc: VideoDesc
  379. CSRep.peer: int32
  380. CSRep.index: int32
  381. CSRep.egotype: int32
  382. """
  383. o1 = protobuf.CSRep()
  384. o1.ParseFromString(body_data)
  385. methods.debug_log(f"{self.connection_id}|Connection_e1:319", f"peer: {o1.peer}")
  386. # methods.debug_log(f"{self.connection_id}|Connection_e1:235", f"#uid: {self.client_info.get('uid')}")
  387. # methods.debug_log(f"{self.connection_id}|Connection_e1:235", f"#rid: {self.client_info.get('rid')}")
  388. # --- send 4010 指定舱端
  389. for item in clients.values():
  390. if item.client_info.get('local_connection_id') == o1.peer:
  391. o2 = protobuf.CSRep()
  392. o2.desc = o1.desc
  393. o2.peer = self.client_info.get('local_connection_id') # 车端id
  394. o2.index = o1.index
  395. o2.egotype = o1.egotype
  396. re_command_id = protobuf.SC_NotifyRep # 4010
  397. re_body_length = o2.ByteSize()
  398. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  399. re_body_data = o2.SerializeToString()
  400. re_send_data = re_head_data + re_body_data
  401. methods.debug_log(f"{self.connection_id}|Connection_e1:217", f"re_command_id: {re_command_id}")
  402. item.client.write(re_send_data)
  403. def message2004(self, body_data):
  404. # --- 监听舱端 2004
  405. """
  406. Offer: 消息体
  407. Offer.index: int32
  408. Offer.peer: int32(车端rid)
  409. Offer.type: string
  410. Offer.sdp: string
  411. """
  412. object = protobuf.Offer()
  413. object.ParseFromString(body_data)
  414. methods.debug_log(f"{self.connection_id}|Connection_e1:348", f"车端id: {object.peer}")
  415. # methods.debug_log(f"{self.connection_id}|Connection_e1:348", f"uid: {self.client_info.get('uid')}")
  416. # methods.debug_log(f"{self.connection_id}|Connection_e1:348", f"rid: {self.client_info.get('rid')}")
  417. # --- 删选指定车端
  418. for item in clients.values():
  419. if item.client_info.get('local_connection_id') == object.peer:
  420. o1 = protobuf.Offer()
  421. o1.index = object.index
  422. o1.peer = self.client_info.get('local_connection_id') # 舱端id
  423. o1.type = object.type
  424. o1.sdp = object.sdp
  425. # --- 设置
  426. self.client_info['remote_connection_id'] = object.peer # 车端id
  427. item.client_info['remote_connection_id'] = self.client_info.get('local_connection_id') # 舱端id
  428. # --- 发送 4012
  429. re_command_id = protobuf.SC_NotifyOffer # 4012
  430. re_body_length = o1.ByteSize()
  431. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  432. re_body_data = o1.SerializeToString()
  433. re_send_data = re_head_data + re_body_data
  434. methods.debug_log(f"{self.connection_id}|Connection_e1:341", f"re_command_id: {re_command_id}")
  435. item.client.write(re_send_data)
  436. def message2005(self, body_data):
  437. # --- 监听 2005
  438. """
  439. Answer: 消息体
  440. Answer.index: int32
  441. Answer.peer: int32(舱端uid)
  442. Answer.type: string
  443. Answer.sdp: string
  444. """
  445. object = protobuf.Answer()
  446. object.ParseFromString(body_data)
  447. methods.debug_log(f"{self.connection_id}|Connection_e1:411", f"peer: {object.peer}")
  448. # methods.debug_log(f"{self.connection_id}|Connection_e1:411", f"uid: {self.client_info.get('uid')}")
  449. # methods.debug_log(f"{self.connection_id}|Connection_e1:411", f"rid: {self.client_info.get('rid')}")
  450. # --- send 4011 指定舱端
  451. for item in clients.values():
  452. if item.client_info.get('local_connection_id') == object.peer:
  453. o1 = protobuf.Answer()
  454. o1.index = object.index
  455. o1.peer = self.client_info.get('local_connection_id') # 车端id
  456. o1.type = object.type
  457. o1.sdp = object.sdp
  458. re_command_id = protobuf.SC_NotifyAnswer # 4011
  459. re_body_length = o1.ByteSize()
  460. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  461. re_body_data = o1.SerializeToString()
  462. re_send_data = re_head_data + re_body_data
  463. methods.debug_log(f"{self.connection_id}|Connection_e1:428", f"re_command_id: {re_command_id}")
  464. methods.debug_log(f"{self.connection_id}|Connection_e1:428", f"index: {o1.index}")
  465. item.client.write(re_send_data)
  466. # --- get VehicleStatus ---
  467. unique_dict = {'name': 'VehicleStatus'}
  468. item = Global.mdb.get_one('GlobalVariable', unique_dict)
  469. data = item.get('args', {})
  470. # --- set VehicleStatus ---
  471. data[str(self.connection_id)] = 4 # 车辆状态 1 离线 2 在线空闲 3 现场驾驶中 4 远程驾驶中
  472. update_dict = {'args': data}
  473. Global.mdb.update_one('GlobalVariable', unique_dict, update_dict)
  474. def message2006(self, body_data):
  475. # --- 监听 2006
  476. """
  477. Candidate: 消息体
  478. Candidate.index: int32
  479. Candidate.peer: int32(车端rid)
  480. Candidate.type: string
  481. Candidate.candidate: string
  482. Candidate.sdpMLineIndex: int32
  483. Candidate.sdpMid: string
  484. Candidate.egotype: int32
  485. """
  486. object = protobuf.Candidate()
  487. object.ParseFromString(body_data)
  488. # methods.debug_log(f"{self.connection_id}|Connection_e1:413", f"peer: {object.peer}")
  489. # methods.debug_log(f"{self.connection_id}|Connection_e1:413", f"uid: {self.client_info.get('uid')}")
  490. # methods.debug_log(f"{self.connection_id}|Connection_e1:413", f"rid: {self.client_info.get('rid')}")
  491. for item in clients.values():
  492. # --- send 4013 舱端到车端
  493. if item.client_info.get('local_connection_id') == object.peer:
  494. o1 = protobuf.Candidate()
  495. o1.index = object.index
  496. o1.peer = self.client_info.get('local_connection_id') # 舱端id,
  497. o1.type = object.type
  498. o1.candidate = object.candidate
  499. o1.sdpMLineIndex = object.sdpMLineIndex
  500. o1.sdpMid = object.sdpMid
  501. o1.egotype = object.egotype
  502. re_command_id = protobuf.SC_NotifyCandidate # 4013
  503. re_body_length = o1.ByteSize()
  504. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  505. re_body_data = o1.SerializeToString()
  506. re_send_data = re_head_data + re_body_data
  507. # methods.debug_log(f"{self.connection_id}|Connection_e1:409", f"re_command_id: {re_command_id}")
  508. item.client.write(re_send_data)
  509. # --- send 4013 车端到舱端
  510. if item.client_info.get('local_connection_id') == object.peer:
  511. o1 = protobuf.Candidate()
  512. o1.index = object.index
  513. o1.peer = self.client_info.get('local_connection_id') # 车端id,
  514. o1.type = object.type
  515. o1.candidate = object.candidate
  516. o1.sdpMLineIndex = object.sdpMLineIndex
  517. o1.sdpMid = object.sdpMid
  518. o1.egotype = object.egotype
  519. re_command_id = protobuf.SC_NotifyCandidate # 4013
  520. re_body_length = o1.ByteSize()
  521. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  522. re_body_data = o1.SerializeToString()
  523. re_send_data = re_head_data + re_body_data
  524. # methods.debug_log(f"{self.connection_id}|Connection_e1:426", f"re_command_id: {re_command_id}")
  525. # methods.debug_log(f"{self.connection_id}|Connection_e1:426", f"candidate.out: {o1.candidate}")
  526. item.client.write(re_send_data)
  527. def message2007(self, body_data):
  528. # --- 监听 2007
  529. """
  530. Leave: 消息体
  531. Leave.peer: int32(车端rid)
  532. Leave.egotype: int32
  533. """
  534. o1 = protobuf.Leave()
  535. o1.ParseFromString(body_data)
  536. methods.debug_log(f"{self.connection_id}|Connection_e1:497", f"peer: {o1.peer}")
  537. # --- send 4009 指定车端
  538. for item in clients.values():
  539. if item.client_info.get('local_connection_id') == o1.peer:
  540. """
  541. Leave: 消息体
  542. Leave.peer: int32(车端rid)
  543. Leave.egotype: int32
  544. """
  545. o2 = protobuf.Leave()
  546. o2.peer = self.client_info.get('local_connection_id') # 舱端id
  547. o2.egotype = o1.egotype
  548. re_command_id = protobuf.SC_NotifyLeave # 4014
  549. re_body_length = o2.ByteSize()
  550. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  551. re_body_data = o2.SerializeToString()
  552. re_send_data = re_head_data + re_body_data
  553. methods.debug_log(f"{self.connection_id}|Connection_e1:515", f"re_command_id: {re_command_id}")
  554. item.client.write(re_send_data)
  555. def message2014(self, body_data):
  556. # --- 监听 2014
  557. """
  558. message CSState
  559. {
  560. UserState state=1;
  561. int32 uid=2;
  562. };
  563. """
  564. o1 = protobuf.CSState()
  565. o1.ParseFromString(body_data)
  566. methods.debug_log(f"{self.connection_id}|Connection_e1:572", f"state: {o1.state}")
  567. methods.debug_log(f"{self.connection_id}|Connection_e1:572", f"uid: {o1.uid}")
  568. # --- send 4016 发送全部舱端
  569. for item in clients.values():
  570. if not item.client_type:
  571. continue
  572. if item.client_type == 'cockpit':
  573. """
  574. message SCState
  575. {
  576. UserState state=1;
  577. int32 uid=2;
  578. };
  579. """
  580. o2 = protobuf.SCState()
  581. o2.state = o1.state
  582. o2.uid = o1.uid # 车端id
  583. re_command_id = protobuf.SC_State # 4022
  584. re_body_length = o2.ByteSize()
  585. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  586. re_body_data = o2.SerializeToString()
  587. re_send_data = re_head_data + re_body_data
  588. methods.debug_log(f"{self.connection_id}|Connection_e1:551", f"re_command_id: {re_command_id}")
  589. item.client.write(re_send_data)
  590. # --- send 6011 指发送车端操作用户的id
  591. for item in clients.values():
  592. if not item.client_info:
  593. continue
  594. if not item.client_info.get('local_connection_id'):
  595. continue
  596. if item.client_info.get('local_connection_id') == o1.uid:
  597. """
  598. Leave: 消息体
  599. Leave.peer: int32(车端rid)
  600. Leave.egotype: int32
  601. """
  602. o3 = protobuf.UserActivityInfo()
  603. o3.user_uuid = self.client_info.get('user_uuid') # 用户uuid
  604. o3.cockpit_id = self.connection_id # 舱端id
  605. o3.vehicle_id = o1.uid # 车端id
  606. re_command_id = protobuf.S2V_SendUserInfo # 6011
  607. re_body_length = o3.ByteSize()
  608. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  609. re_body_data = o3.SerializeToString()
  610. re_send_data = re_head_data + re_body_data
  611. methods.debug_log(f"{self.connection_id}|Connection_e1:664", f"re_command_id: {re_command_id}")
  612. item.client.write(re_send_data)
  613. @staticmethod
  614. async def check_clients():
  615. """
  616. 剔除掉线连接
  617. """
  618. count = 0
  619. while True:
  620. count += 1
  621. print(f"Connection_e1:689|count: {count}", flush=True)
  622. now_at = methods.now_ts()
  623. for connection_id in list(clients.keys()):
  624. object = clients.get(connection_id)
  625. print(f"Connection_e1:689|client: {object.client}, update_at: {object.update_at}, "
  626. f"client_type: {object.client_type}", flush=True)
  627. await asyncio.sleep(3) # 发送消息的间隔时间
  628. @staticmethod
  629. async def run():
  630. """
  631. """
  632. # --- define ---
  633. loop = asyncio.get_running_loop()
  634. server = await loop.create_server(
  635. lambda: SRIConnection(),
  636. '0.0.0.0', Global.egoserver_port
  637. )
  638. # --- start ---
  639. async with server:
  640. print(f"Connection_e1:713|Server listening on 0.0.0.0:{Global.egoserver_port}", flush=True)
  641. # await loop.create_task(Connection_e1:.check_clients())
  642. await server.serve_forever()
  643. if __name__ == '__main__':
  644. asyncio.run(SRIConnection.run())