""" todo 1、存在消息被异步多次处理的问题 """ # from hub import methods, Global import struct import asyncio import time import socket import sys import importlib # --- for linux sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01') sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-pysdk') protobuf = importlib.import_module(f"xprotobuf.protocol_pb2") methods = importlib.import_module(f"xlib") clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)} serial_rid_dict = { '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000, '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001, 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002, 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002, 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用 } account_uid_dict = { 'ego': 3 } class SRIConnection(asyncio.Protocol): """""" head_sequence = '<hh' # 字节序规则 head_size = struct.calcsize(head_sequence) message_data = b'' def connection_made(self, client): """ 建立客户端连接 """ peername = client.get_extra_info('peername') self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 12700130000 客户端id self.client = client # self.data = b'' self.client_type = None self.client_info = None self.update_at = methods.now_ts() clients[self.connection_id] = self # 获取底层 socket # sock = self.client.get_extra_info('socket') # 查看缓冲区大小 # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}") # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}") def connection_lost(self, exc): """ 关闭连接 """ methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭") # --- 处理车端掉线,通知所有舱端 if self.client_type == 'vehicle': """ SCDelRobot: 消息体 SCDelRobot.peer: int32 SCDelRobot.egotype: int32 """ o2 = protobuf.SCDelRobot() o2.peer = self.client_info.get('rid') # 车端唯一标识 o2.egotype = self.client_info.get('egotype') re_command_id = protobuf.SC_NotifyDel # 4017 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}") for item in clients.values(): if item.client_type == 'cockpit': item.client.write(re_send_data) # --- clean if self.connection_id in clients: del clients[self.connection_id] def data_received(self, data): """ 消息处理 """ # methods.debug_log(f'{self.connection_id}|SRIConnection96', f"data length: {len(data)}") asyncio.create_task(self.handle_data(data)) async def handle_data(self, data): """ 消息处理 """ # methods.debug_log(f'{self.connection_id}|SRIConnection102', f"data length: {len(data)}") loop = asyncio.get_running_loop() await loop.run_in_executor(None, self.process_data, data) def process_data(self, data): """ 消息处理 """ try: methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}") self.message_data += data # 执行了这个以后 methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1") methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2") methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3") count = 0 while True: # --- print count += 1 methods.debug_log(f"{self.connection_id}|SRIConnection123", f"---------------------------------- while count: {count}") # 确保有足够的字节来处理消息头 if len(self.message_data) < self.head_size: break # 获取消息头 head_data = self.message_data[:self.head_size] command_id, body_length = struct.unpack(self.head_sequence, head_data) methods.debug_log(f'{self.connection_id}|SRIConnection130', f"command_id: {command_id}, body_length: {body_length}") # 检查命令ID是否有效 if not (2000 <= command_id < 5000): self.message_data = b'' # 清空无效数据 break # 检查完整消息是否接收完毕 total_length = self.head_size + body_length if len(self.message_data) < total_length: break # 数据不完整,等待更多数据 # 调用相应处理方法 method = getattr(self, f'message{command_id}', None) if method: method(self.message_data[self.head_size:total_length]) # 处理完整消息 else: methods.debug_log(f"{self.connection_id}|SRIConnectionError", f"No handler for command_id: {command_id}") # 移除已处理的消息 self.message_data = self.message_data[total_length:] except Exception as exception: methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}") methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}") def message2008(self, body_data): # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008") pass def message2009(self, body_data): # --- 监听 2009 """ CSAdd: 消息体 CSAdd.serial: string CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car CSAdd.name: string """ object0 = protobuf.CSAdd() object0.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}") methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}") methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}") # --- update --- self.client_type = 'vehicle' self.client_info = { 'rid': serial_rid_dict.get(object0.serial), 'name': object0.name, 'serial': object0.serial, 'egotype': 2, # 车端类型 } # --- send 4016 """ SCAddRobot: 消息体 SCAddRobot.robot: Robot Robot: 消息体 Robot.rid: int32 Robot.name: string Robot.type: int32 Robot.state: RobotState Offline Online Busy """ o1 = protobuf.Robot() o1.rid = self.client_info.get('rid') o1.name = object0.name o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car o1.state = protobuf.Robot.Online o2 = protobuf.SCAddRobot() o2.robot.CopyFrom(o1) re_command_id = protobuf.SC_NotifyAdd # 4016 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection136", f"sendmessage: {re_command_id}") # --- send 4016 发送全部舱端 for item in clients.values(): if item.client_type == 'cockpit': item.client.write(re_send_data) # --- send 4007 todo 凯强说并未用到 # o1 = protobuf.SCAdd() # o1.ret = True # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车 # o1.name = object0.name # re_command_id = protobuf.SC_Add # 4007 # re_body_length = o1.ByteSize() # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) # re_body_data = o1.SerializeToString() # re_send_data = re_head_data + re_body_data # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}") # self.client.write(re_send_data) def message2000(self, body_data): # --- 解析消息体 object = protobuf.CSSign() object.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}") methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}") # --- update --- self.client_type = 'cockpit' self.client_info = { 'uid': 3, # 对应数据库里的ego的id 'name': 'ego', # 对应数据库里 'egotype': 1, # 舱端类型 } # --- send 4000 object = protobuf.SCSign() object.ret = True object.uid = self.client_info.get('uid') object.name = self.client_info.get('name') re_command_id = protobuf.SC_Sign # 4000 re_body_length = object.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = object.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}") self.client.write(re_send_data) def message2010(self, body_data): # --- send 4008 发送全部车端信息列表 """ Robot: 消息体 Robot.rid: string Robot.name: string Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car Robot.state: enum Offline Online Busy """ o2 = protobuf.SCRobot() for item in clients.values(): if item.client_type and item.client_type == 'vehicle': o1 = protobuf.Robot() o1.rid = item.client_info.get('rid') o1.name = item.client_info.get('name') o1.type = 2 # EgoType::Car o1.state = protobuf.Robot.RobotState.Value('Online') o2.robot.add().CopyFrom(o1) re_command_id = protobuf.SC_Robot # 4008 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}") self.client.write(re_send_data) def message2001(self, body_data): # --- 解析消息体 """ CSReq: 消息体 CSReq.peer: int32(rid,车端唯一标识) CSReq.index: int32(相机位置,RenderPosition) CSReq.egotype: int32(终端类型,舱端/车端) """ o1 = protobuf.CSReq() o1.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}") methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.index}") # methods.debug_log(f"{self.connection_id}|SRIConnection235.message2001", # f"#=============================index: {o1.index}") methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#egotype: {o1.egotype}") methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}") methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}") # --- send 4009 指定车端 for item in clients.values(): if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer: """ CSReq: 消息体 CSReq.peer: int32(rid,车端唯一标识) CSReq.index: int32(相机位置,RenderPosition) CSReq.egotype: int32(终端类型,舱端/车端) """ o2 = protobuf.CSReq() o2.peer = self.client_info.get('uid') o2.index = o1.index o2.egotype = o1.egotype re_command_id = protobuf.SC_NotifyReq # 4009 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}") item.client.write(re_send_data) def message2002(self, body_data): # --- 解析消息体 """ CSRep: 消息体 CSRep.desc: VideoDesc CSRep.peer: int32 CSRep.index: int32 CSRep.egotype: int32 """ o1 = protobuf.CSRep() o1.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#desc: {o1.desc}") methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}") methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#index: {o1.index}") methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#egotype: {o1.egotype}") methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}") methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}") # --- send 4010 通知指定舱端 for item in clients.values(): if item.client_info.get('uid') and item.client_info.get('uid') == o1.peer: o2 = protobuf.CSRep() o2.desc = o1.desc o2.peer = self.client_info.get('rid') o2.index = o1.index o2.egotype = o1.egotype re_command_id = protobuf.SC_NotifyRep # 4010 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}") item.client.write(re_send_data) def message2004(self, body_data): # --- 解析消息体 """ Offer: 消息体 Offer.index: int32 Offer.peer: int32(车端rid) Offer.type: string Offer.sdp: string """ object = protobuf.Offer() object.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}") methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}") methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}") # --- send 4012 指定车端 for item in clients.values(): if item.client_info.get('rid') and item.client_info.get('rid') == object.peer: o1 = protobuf.Offer() o1.index = object.index o1.peer = self.client_info.get('uid') # 将舱端id赋值到指定车端对象 o1.type = object.type o1.sdp = object.sdp re_command_id = protobuf.SC_NotifyOffer # 4012 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}") item.client.write(re_send_data) def message2005(self, body_data): # --- 解析消息体 """ Answer: 消息体 Answer.index: int32 Answer.peer: int32(舱端uid) Answer.type: string Answer.sdp: string """ object = protobuf.Answer() object.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}") methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}") methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}") # --- send 4011 指定舱端 count = 0 for item in clients.values(): count += 1 methods.debug_log(f"{self.connection_id}|SRIConnection428", f"[{count}]item.client_info: {item.client_info}") if item.client_info.get('uid') and item.client_info.get('uid') == object.peer: o1 = protobuf.Answer() o1.index = object.index o1.peer = self.client_info.get('rid') # 将车端id赋值到指定舱端对象, o1.type = object.type o1.sdp = object.sdp re_command_id = protobuf.SC_NotifyAnswer # 4011 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id}|SRIConnection428", f"=====================================================-----index: {o1.index}") item.client.write(re_send_data) def message2006(self, body_data): # --- 解析消息体 """ Candidate: 消息体 Candidate.index: int32 Candidate.peer: int32(车端rid) Candidate.type: string Candidate.candidate: string Candidate.sdpMLineIndex: int32 Candidate.sdpMid: string Candidate.egotype: int32 """ object = protobuf.Candidate() object.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}") methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#candidate.in: {object.candidate}") methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}") methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}") for item in clients.values(): # --- send 4013 舱端到车端 if item.client_info.get('rid') and item.client_info.get('rid') == object.peer: o1 = protobuf.Candidate() o1.index = object.index o1.peer = self.client_info.get('uid') # 将舱端id赋值到指定车端对象, o1.type = object.type o1.candidate = object.candidate o1.sdpMLineIndex = object.sdpMLineIndex o1.sdpMid = object.sdpMid o1.egotype = object.egotype re_command_id = protobuf.SC_NotifyCandidate # 4013 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id}|SRIConnection409", f"#candidate.out: {o1.candidate}") item.client.write(re_send_data) # --- send 4013 车端到舱端 if item.client_info.get('uid') and item.client_info.get('uid') == object.peer: o1 = protobuf.Candidate() o1.index = object.index o1.peer = self.client_info.get('rid') # 将车端id赋值到指定舱端对象, o1.type = object.type o1.candidate = object.candidate o1.sdpMLineIndex = object.sdpMLineIndex o1.sdpMid = object.sdpMid o1.egotype = object.egotype re_command_id = protobuf.SC_NotifyCandidate # 4013 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}") item.client.write(re_send_data) def message2007(self, body_data): # --- 解析消息体 """ Leave: 消息体 Leave.peer: int32(车端rid) Leave.egotype: int32 """ o1 = protobuf.Leave() o1.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}") # --- send 4009 指定车端 for item in clients.values(): if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer: """ Leave: 消息体 Leave.peer: int32(车端rid) Leave.egotype: int32 """ o2 = protobuf.Leave() o2.peer = self.client_info.get('uid') o2.egotype = o1.egotype re_command_id = protobuf.SC_NotifyLeave # 4014 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}") item.client.write(re_send_data) def message2014(self, body_data): # --- 解析消息体 """ message CSState { UserState state=1; int32 uid=2; }; """ o1 = protobuf.CSState() o1.ParseFromString(body_data) methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#state: {o1.state}") methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}") # --- send 4016 发送全部舱端 for item in clients.values(): if item.client_type == 'cockpit': """ message SCState { UserState state=1; int32 uid=2; }; """ o2 = protobuf.SCState() o2.state = o1.state o2.uid = self.client_info.get('rid') re_command_id = protobuf.SC_State # 4022 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}") item.client.write(re_send_data) @staticmethod async def check_clients(): """ 剔除掉线连接 """ count = 0 while True: count += 1 print(f"#count: {count}", flush=True) now_at = methods.now_ts() for connection_id in list(clients.keys()): object = clients.get(connection_id) print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}", flush=True) await asyncio.sleep(3) # 发送消息的间隔时间 @staticmethod async def run(): """ """ # --- define --- loop = asyncio.get_running_loop() server = await loop.create_server( lambda: SRIConnection(), '0.0.0.0', 20917 ) # --- start --- async with server: print("Server listening on 0.0.0.0:20917", flush=True) # await loop.create_task(SRIConnection.check_clients()) await server.serve_forever() if __name__ == '__main__': asyncio.run(SRIConnection.run())