Connection_e1.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. """
  2. """
  3. from hub import methods, Global
  4. from werkzeug.security import check_password_hash
  5. import struct
  6. import asyncio
  7. import time
  8. import socket
  9. import sys
  10. import importlib
  11. # --- for linux
  12. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-server-bg03')
  13. # sys.path.append('/home/sri/repositories/repositories/SRI-DINO.Server-py/sri-pysdk')
  14. # --- for windows
  15. sys.path.append(r'C:\SRI-DINO.Server-py\sri-server-bg03')
  16. sys.path.append(r'C:\SRI-DINO.Server-py\sri-pysdk')
  17. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  18. methods = importlib.import_module(f"xlib")
  19. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  20. serial_rid_dict = {
  21. '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
  22. '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
  23. 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
  24. 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
  25. 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
  26. }
  27. account_uid_dict = {
  28. 'ego': 3
  29. }
  30. # live_relationship = {} # {<id-1>id-2>: True}
  31. class SRIConnection(asyncio.Protocol):
  32. """"""
  33. head_sequence = '<hh' # 字节序规则
  34. head_size = struct.calcsize(head_sequence)
  35. message_data = b''
  36. def __init__(self):
  37. self.lock = asyncio.Lock() # 初始化锁
  38. def connection_made(self, client):
  39. """
  40. 建立客户端连接
  41. """
  42. peername = client.get_extra_info('peername')
  43. # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 客户端id
  44. self.connection_id = int(f"{peername[1]}") # 客户端id(公网情况)
  45. # self.connection_id = int(f"{peername[0].replace('.', '')}") # 客户端id(固定ip情况)
  46. self.client = client
  47. # self.data = b''
  48. self.client_type = None
  49. self.client_info = None
  50. self.update_at = methods.now_ts()
  51. clients[self.connection_id] = self
  52. # 获取底层 socket
  53. # sock = self.client.get_extra_info('socket')
  54. # 查看缓冲区大小
  55. # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
  56. # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  57. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
  58. # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
  59. def connection_lost(self, exc):
  60. """
  61. 关闭连接
  62. """
  63. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
  64. # --- 处理车端掉线,通知所有舱端
  65. if self.client_type == 'vehicle':
  66. """
  67. SCDelRobot: 消息体
  68. SCDelRobot.peer: int32
  69. SCDelRobot.egotype: int32
  70. """
  71. o2 = protobuf.SCDelRobot()
  72. o2.peer = self.client_info.get('connection_id') # 车端id
  73. o2.egotype = self.client_info.get('egotype')
  74. re_command_id = protobuf.SC_NotifyDel # 4017
  75. re_body_length = o2.ByteSize()
  76. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  77. re_body_data = o2.SerializeToString()
  78. re_send_data = re_head_data + re_body_data
  79. methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
  80. for item in clients.values():
  81. if item.client_type == 'cockpit':
  82. item.client.write(re_send_data)
  83. # --- 处理舱端掉线,通知所有车端
  84. if self.client_type == 'cockpit':
  85. """
  86. Leave: 消息体
  87. Leave.peer: int32(车端rid)
  88. Leave.egotype: int32
  89. """
  90. o2 = protobuf.Leave()
  91. o2.peer = self.client_info.get('connection_id') # 舱端id
  92. o2.egotype = 1 # 客户端类型
  93. re_command_id = protobuf.SC_NotifyLeave # 4014
  94. re_body_length = o2.ByteSize()
  95. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  96. re_body_data = o2.SerializeToString()
  97. re_send_data = re_head_data + re_body_data
  98. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  99. for item in clients.values():
  100. if item.client_type == 'vehicle':
  101. item.client.write(re_send_data)
  102. # --- clean
  103. if self.connection_id in clients:
  104. del clients[self.connection_id]
  105. # --- clean live_relationship
  106. # keys = [key for key in live_relationship.keys() if self.connection_id in key]
  107. # for key in keys:
  108. # del live_relationship[key]
  109. def data_received(self, data):
  110. """
  111. 消息处理
  112. """
  113. asyncio.create_task(self.handle_data(data))
  114. async def handle_data(self, data):
  115. """
  116. 消息处理
  117. """
  118. # --- before
  119. # loop = asyncio.get_running_loop()
  120. # await loop.run_in_executor(None, self.process_data, data)
  121. # async with self.lock: # 加入锁,确保只有一个任务在处理数据
  122. # await self.process_data(data)
  123. async with self.lock: # 加入锁,确保只有一个任务在处理数据
  124. loop = asyncio.get_running_loop()
  125. await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
  126. def process_data(self, data):
  127. """
  128. 消息处理
  129. """
  130. try:
  131. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
  132. self.message_data += data # 执行了这个以后
  133. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
  134. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
  135. # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
  136. # count = 0
  137. while True:
  138. # --- print
  139. # count += 1
  140. # methods.debug_log(f"{self.connection_id}|SRIConnection123",
  141. # f"---------------------------------- while count: {count}")
  142. # 确保有足够的字节来处理消息头
  143. if len(self.message_data) < self.head_size:
  144. break
  145. # 获取消息头
  146. head_data = self.message_data[:self.head_size]
  147. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  148. if command_id not in [2008]:
  149. methods.debug_log(f'{self.connection_id}|SRIConnection176',
  150. f"command_id: {command_id}, body_length: {body_length}")
  151. # 检查命令ID是否有效
  152. if not (2000 <= command_id < 5000):
  153. self.message_data = b'' # 清空无效数据
  154. break
  155. # 检查完整消息是否接收完毕
  156. total_length = self.head_size + body_length
  157. if len(self.message_data) < total_length:
  158. break # 数据不完整,等待更多数据
  159. # 调用相应处理方法
  160. method = getattr(self, f'message{command_id}', None)
  161. if method:
  162. method(self.message_data[self.head_size:total_length]) # 处理完整消息
  163. else:
  164. methods.debug_log(f"{self.connection_id}|SRIConnectionError",
  165. f"No handler for command_id: {command_id}")
  166. # 移除已处理的消息
  167. self.message_data = self.message_data[total_length:]
  168. except Exception as exception:
  169. methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
  170. methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
  171. def message2008(self, body_data):
  172. # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
  173. pass
  174. def message2009(self, body_data):
  175. # --- 监听 2009
  176. """
  177. CSAdd: 消息体
  178. CSAdd.serial: string
  179. CSAdd.type: int32 EgoType::Car | 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  180. CSAdd.name: string
  181. """
  182. object0 = protobuf.CSAdd()
  183. object0.ParseFromString(body_data)
  184. methods.debug_log(f"{self.connection_id}|SRIConnection219", f"#serial: {object0.serial}")
  185. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
  186. # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
  187. # --- update ---
  188. self.client_type = 'vehicle'
  189. self.client_info = {
  190. 'connection_id': self.connection_id,
  191. 'rid': serial_rid_dict.get(object0.serial),
  192. 'name': object0.name,
  193. 'serial': object0.serial,
  194. 'egotype': 2, # 客户端类型
  195. }
  196. """
  197. SCAddRobot: 消息体
  198. SCAddRobot.robot: Robot
  199. Robot: 消息体
  200. Robot.rid: int32
  201. Robot.name: string
  202. Robot.type: int32
  203. Robot.state: RobotState Offline Online Busy
  204. """
  205. o1 = protobuf.Robot()
  206. # o1.rid = self.client_info.get('rid')
  207. o1.rid = self.client_info.get('connection_id')
  208. o1.name = object0.name
  209. o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
  210. o1.state = protobuf.Robot.Online
  211. o2 = protobuf.SCAddRobot()
  212. o2.robot.CopyFrom(o1)
  213. re_command_id = protobuf.SC_NotifyAdd # 4016
  214. re_body_length = o2.ByteSize()
  215. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  216. re_body_data = o2.SerializeToString()
  217. re_send_data = re_head_data + re_body_data
  218. # --- send 4016 发送全部舱端
  219. for item in clients.values():
  220. if item.client_type == 'cockpit':
  221. methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
  222. item.client.write(re_send_data)
  223. # --- send 4007 todo 凯强说并未用到
  224. # o1 = protobuf.SCAdd()
  225. # o1.ret = True
  226. # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
  227. # o1.name = object0.name
  228. # re_command_id = protobuf.SC_Add # 4007
  229. # re_body_length = o1.ByteSize()
  230. # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  231. # re_body_data = o1.SerializeToString()
  232. # re_send_data = re_head_data + re_body_data
  233. # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
  234. # self.client.write(re_send_data)
  235. def message2000(self, body_data):
  236. # --- 解析消息体
  237. object = protobuf.CSSign()
  238. object.ParseFromString(body_data)
  239. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
  240. methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
  241. # --- update ---
  242. ret = True
  243. if object.account == "Ego" and object.password != '123456':
  244. ret = False
  245. # --- check ---
  246. user = Global.mdb.get_one('UserInfo', {'username': object.account})
  247. if not user:
  248. ret = False
  249. elif not check_password_hash(user['password'], object.password):
  250. ret = False
  251. else:
  252. ret = True
  253. # --- update ---
  254. if ret:
  255. self.client_type = 'cockpit'
  256. self.client_info = {
  257. 'connection_id': self.connection_id,
  258. 'uid': 3, # 对应数据库里的ego的id
  259. 'name': 'ego', # 对应数据库里
  260. 'egotype': 1, # 舱端类型
  261. }
  262. # --- send 4000
  263. object = protobuf.SCSign()
  264. object.ret = ret
  265. object.uid = self.client_info.get('connection_id')
  266. object.name = self.client_info.get('name')
  267. re_command_id = protobuf.SC_Sign # 4000
  268. re_body_length = object.ByteSize()
  269. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  270. re_body_data = object.SerializeToString()
  271. re_send_data = re_head_data + re_body_data
  272. methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
  273. self.client.write(re_send_data)
  274. def message2010(self, body_data):
  275. # --- send 4008 发送全部车端信息列表
  276. """
  277. Robot: 消息体
  278. Robot.rid: string
  279. Robot.name: string
  280. Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
  281. Robot.state: enum Offline Online Busy
  282. """
  283. # --- 获取全部在线车辆列表
  284. o2 = protobuf.SCRobot()
  285. for item in clients.values():
  286. if item.client_type and item.client_type == 'vehicle':
  287. o1 = protobuf.Robot()
  288. # o1.rid = item.client_info.get('rid')
  289. o1.rid = item.client_info.get('connection_id')
  290. o1.name = item.client_info.get('name')
  291. o1.type = 2 # EgoType::Car
  292. o1.state = protobuf.Robot.RobotState.Value('Online')
  293. o2.robot.add().CopyFrom(o1)
  294. re_command_id = protobuf.SC_Robot # 4008
  295. re_body_length = o2.ByteSize()
  296. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  297. re_body_data = o2.SerializeToString()
  298. re_send_data = re_head_data + re_body_data
  299. methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
  300. self.client.write(re_send_data)
  301. def message2001(self, body_data):
  302. # --- 解析消息体
  303. """
  304. CSReq: 消息体
  305. CSReq.peer: int32(rid,车端唯一标识)
  306. CSReq.index: int32(相机位置,RenderPosition)
  307. CSReq.egotype: int32(终端类型,舱端/车端)
  308. """
  309. o1 = protobuf.CSReq()
  310. o1.ParseFromString(body_data)
  311. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
  312. methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
  313. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
  314. # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
  315. # --- send 4009 指定车端
  316. for item in clients.values():
  317. if item.client_info.get('connection_id') == o1.peer:
  318. """
  319. CSReq: 消息体
  320. CSReq.peer: int32(rid,车端唯一标识)
  321. CSReq.index: int32(相机位置,RenderPosition)
  322. CSReq.egotype: int32(终端类型,舱端/车端)
  323. """
  324. o2 = protobuf.CSReq()
  325. o2.peer = self.client_info.get('connection_id') # 舱端id
  326. o2.index = o1.index
  327. o2.egotype = o1.egotype
  328. re_command_id = protobuf.SC_NotifyReq # 4009
  329. re_body_length = o2.ByteSize()
  330. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  331. re_body_data = o2.SerializeToString()
  332. re_send_data = re_head_data + re_body_data
  333. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  334. item.client.write(re_send_data)
  335. def message2002(self, body_data):
  336. # --- 解析消息体
  337. """
  338. CSRep: 消息体
  339. CSRep.desc: VideoDesc
  340. CSRep.peer: int32
  341. CSRep.index: int32
  342. CSRep.egotype: int32
  343. """
  344. o1 = protobuf.CSRep()
  345. o1.ParseFromString(body_data)
  346. methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
  347. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
  348. # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
  349. # --- send 4010 指定舱端
  350. for item in clients.values():
  351. if item.client_info.get('connection_id') == o1.peer:
  352. o2 = protobuf.CSRep()
  353. o2.desc = o1.desc
  354. o2.peer = self.client_info.get('connection_id') # 车端id
  355. o2.index = o1.index
  356. o2.egotype = o1.egotype
  357. re_command_id = protobuf.SC_NotifyRep # 4010
  358. re_body_length = o2.ByteSize()
  359. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  360. re_body_data = o2.SerializeToString()
  361. re_send_data = re_head_data + re_body_data
  362. methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
  363. item.client.write(re_send_data)
  364. def message2004(self, body_data):
  365. # --- 解析消息体
  366. """
  367. Offer: 消息体
  368. Offer.index: int32
  369. Offer.peer: int32(车端rid)
  370. Offer.type: string
  371. Offer.sdp: string
  372. """
  373. object = protobuf.Offer()
  374. object.ParseFromString(body_data)
  375. methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
  376. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
  377. # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
  378. # --- send 4012 指定车端
  379. for item in clients.values():
  380. if item.client_info.get('connection_id') == object.peer:
  381. o1 = protobuf.Offer()
  382. o1.index = object.index
  383. o1.peer = self.client_info.get('connection_id') # 舱端id
  384. o1.type = object.type
  385. o1.sdp = object.sdp
  386. re_command_id = protobuf.SC_NotifyOffer # 4012
  387. re_body_length = o1.ByteSize()
  388. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  389. re_body_data = o1.SerializeToString()
  390. re_send_data = re_head_data + re_body_data
  391. methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
  392. item.client.write(re_send_data)
  393. def message2005(self, body_data):
  394. # --- 解析消息体
  395. """
  396. Answer: 消息体
  397. Answer.index: int32
  398. Answer.peer: int32(舱端uid)
  399. Answer.type: string
  400. Answer.sdp: string
  401. """
  402. object = protobuf.Answer()
  403. object.ParseFromString(body_data)
  404. methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
  405. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
  406. # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
  407. # --- send 4011 指定舱端
  408. for item in clients.values():
  409. if item.client_info.get('connection_id') == object.peer:
  410. o1 = protobuf.Answer()
  411. o1.index = object.index
  412. o1.peer = self.client_info.get('connection_id') # 车端id
  413. o1.type = object.type
  414. o1.sdp = object.sdp
  415. re_command_id = protobuf.SC_NotifyAnswer # 4011
  416. re_body_length = o1.ByteSize()
  417. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  418. re_body_data = o1.SerializeToString()
  419. re_send_data = re_head_data + re_body_data
  420. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
  421. methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
  422. item.client.write(re_send_data)
  423. def message2006(self, body_data):
  424. # --- 解析消息体
  425. """
  426. Candidate: 消息体
  427. Candidate.index: int32
  428. Candidate.peer: int32(车端rid)
  429. Candidate.type: string
  430. Candidate.candidate: string
  431. Candidate.sdpMLineIndex: int32
  432. Candidate.sdpMid: string
  433. Candidate.egotype: int32
  434. """
  435. object = protobuf.Candidate()
  436. object.ParseFromString(body_data)
  437. methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
  438. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
  439. # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
  440. for item in clients.values():
  441. # --- send 4013 舱端到车端
  442. if item.client_info.get('connection_id') == object.peer:
  443. o1 = protobuf.Candidate()
  444. o1.index = object.index
  445. o1.peer = self.client_info.get('connection_id') # 舱端id,
  446. o1.type = object.type
  447. o1.candidate = object.candidate
  448. o1.sdpMLineIndex = object.sdpMLineIndex
  449. o1.sdpMid = object.sdpMid
  450. o1.egotype = object.egotype
  451. re_command_id = protobuf.SC_NotifyCandidate # 4013
  452. re_body_length = o1.ByteSize()
  453. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  454. re_body_data = o1.SerializeToString()
  455. re_send_data = re_head_data + re_body_data
  456. methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
  457. item.client.write(re_send_data)
  458. # --- send 4013 车端到舱端
  459. if item.client_info.get('connection_id') == object.peer:
  460. o1 = protobuf.Candidate()
  461. o1.index = object.index
  462. o1.peer = self.client_info.get('connection_id') # 车端id,
  463. o1.type = object.type
  464. o1.candidate = object.candidate
  465. o1.sdpMLineIndex = object.sdpMLineIndex
  466. o1.sdpMid = object.sdpMid
  467. o1.egotype = object.egotype
  468. re_command_id = protobuf.SC_NotifyCandidate # 4013
  469. re_body_length = o1.ByteSize()
  470. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  471. re_body_data = o1.SerializeToString()
  472. re_send_data = re_head_data + re_body_data
  473. methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
  474. # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
  475. item.client.write(re_send_data)
  476. def message2007(self, body_data):
  477. # --- 解析消息体
  478. """
  479. Leave: 消息体
  480. Leave.peer: int32(车端rid)
  481. Leave.egotype: int32
  482. """
  483. o1 = protobuf.Leave()
  484. o1.ParseFromString(body_data)
  485. methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
  486. # --- send 4009 指定车端
  487. for item in clients.values():
  488. if item.client_info.get('connection_id') == o1.peer:
  489. """
  490. Leave: 消息体
  491. Leave.peer: int32(车端rid)
  492. Leave.egotype: int32
  493. """
  494. o2 = protobuf.Leave()
  495. o2.peer = self.client_info.get('connection_id') # 舱端id
  496. o2.egotype = o1.egotype
  497. re_command_id = protobuf.SC_NotifyLeave # 4014
  498. re_body_length = o2.ByteSize()
  499. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  500. re_body_data = o2.SerializeToString()
  501. re_send_data = re_head_data + re_body_data
  502. methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
  503. item.client.write(re_send_data)
  504. def message2014(self, body_data):
  505. # --- 解析消息体
  506. """
  507. message CSState
  508. {
  509. UserState state=1;
  510. int32 uid=2;
  511. };
  512. """
  513. o1 = protobuf.CSState()
  514. o1.ParseFromString(body_data)
  515. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#state: {o1.state}")
  516. methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#uid: {o1.uid}")
  517. # --- send 4016 发送全部舱端
  518. for item in clients.values():
  519. if item.client_type == 'cockpit':
  520. """
  521. message SCState
  522. {
  523. UserState state=1;
  524. int32 uid=2;
  525. };
  526. """
  527. o2 = protobuf.SCState()
  528. o2.state = o1.state
  529. o2.uid = o1.uid # 车端id
  530. re_command_id = protobuf.SC_State # 4022
  531. re_body_length = o2.ByteSize()
  532. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  533. re_body_data = o2.SerializeToString()
  534. re_send_data = re_head_data + re_body_data
  535. methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
  536. item.client.write(re_send_data)
  537. # --- send 6011 指发送车端操作用户的id
  538. for item in clients.values():
  539. if item.client_info.get('connection_id') == o1.uid:
  540. """
  541. Leave: 消息体
  542. Leave.peer: int32(车端rid)
  543. Leave.egotype: int32
  544. """
  545. o3 = protobuf.UserActivityInfo()
  546. o3.user_uuid = 'SSGGSSEEFFHHWWSS' # 用户uuid
  547. o3.cockpit_id = self.connection_id # 舱端id
  548. o3.vehicle_id = 112233 # 车端id
  549. re_command_id = protobuf.S2V_SendUserInfo # 6011
  550. re_body_length = o3.ByteSize()
  551. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  552. re_body_data = o3.SerializeToString()
  553. re_send_data = re_head_data + re_body_data
  554. methods.debug_log(f"{self.connection_id}|SRIConnection611", f"re_command_id: {re_command_id}")
  555. item.client.write(re_send_data)
  556. @staticmethod
  557. async def check_clients():
  558. """
  559. 剔除掉线连接
  560. """
  561. count = 0
  562. while True:
  563. count += 1
  564. print(f"#count: {count}", flush=True)
  565. now_at = methods.now_ts()
  566. for connection_id in list(clients.keys()):
  567. object = clients.get(connection_id)
  568. print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
  569. flush=True)
  570. await asyncio.sleep(3) # 发送消息的间隔时间
  571. @staticmethod
  572. async def run():
  573. """
  574. """
  575. # --- define ---
  576. loop = asyncio.get_running_loop()
  577. server = await loop.create_server(
  578. lambda: SRIConnection(),
  579. '0.0.0.0', 20917
  580. )
  581. # --- start ---
  582. async with server:
  583. print("Server listening on 0.0.0.0:20917", flush=True)
  584. # await loop.create_task(SRIConnection.check_clients())
  585. await server.serve_forever()
  586. if __name__ == '__main__':
  587. asyncio.run(SRIConnection.run())