# from hub import methods, Global import struct import asyncio import time # from concurrent.futures import ThreadPoolExecutor import sys import importlib # --- for linux sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03') sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty') protobuf = importlib.import_module(f"xprotobuf.protocol_pb2") methods = importlib.import_module(f"xlib") class SRIConnection(asyncio.Protocol): """""" # head_sequence = '<hh' # 字节序规则 # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)} # executor = ThreadPoolExecutor() clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)} def connection_made(self, client): self.client = client self.peername = client.get_extra_info('peername') print(f"Connection from {self.peername}") # --- 自定义参数 self.head_sequence = '<hh' # 字节序规则 self.head_size = struct.calcsize(self.head_sequence) self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}" # 12700130000 # --- fill --- self.clients[self.connection_id] = client, methods.now_ts(), 'type=vehicle' def data_received(self, data): asyncio.create_task(self.handle_data(data)) async def handle_data(self, data): loop = asyncio.get_running_loop() # 全局的 ThreadPoolExecutor,由 asyncio 库自动创建和管理。它的线程池大小默认是机器的CPU核心数乘以5 await loop.run_in_executor(None, self.process_data, data) # 自定义的 ThreadPoolExecutor # await loop.run_in_executor(SRIConnection.executor, self.process_data, data) # result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data) # if result: # self.client.write(result) def process_data(self, data): head_data = data[:self.head_size] body_data = data[self.head_size:] command_id, body_length = struct.unpack(self.head_sequence, head_data) methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}") if command_id == protobuf.CS_KeepAlive: # 2008 methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None") elif command_id == protobuf.CS_Sign: # 2000 # --- 接收请求数据 object = protobuf.CSSign() object.ParseFromString(body_data) methods.debug_log(f"{self.connection_id} | SRIConnection.100", f"Received Response: account={object.account}, password={object.password}") # --- 发送返回数据 object = protobuf.SCSign() object.ret = True object.uid = 112233 object.cid = 223344 object.name = 'aabbcc' re_command_id = protobuf.SC_Sign # 4000 re_body_length = object.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = object.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) # --- test --- # for _ in range(10): # for i in range(3)[::-1]: # methods.debug_log(f"{self.connection_id} | SRIConnection.92", f"sleep {i}s!") # time.sleep(1) # methods.debug_log(f"{self.connection_id} | SRIConnection.94", f"send re_send_data") # self.client.write(re_send_data) elif command_id == protobuf.CS_Add: # 2009 # --- 接收请求数据 2009 """ CSAdd: 消息体 CSAdd.serial: string CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car CSAdd.name: string """ object0 = protobuf.CSAdd() object0.ParseFromString(body_data) methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#serial: {object0.serial}") methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#name: {object0.name}") # --- 发送返回数据 4016 todo 向所有 EgoType::User 也就是舱端发送消息 """ Robot: 消息体 Robot.rid: int32 Robot.name: string Robot.type: int32 Robot.state: RobotState Offline Online Busy """ o1 = protobuf.Robot() o1.rid = 112233 o1.name = "aabbcc" o1.type = 123 o1.state = protobuf.Robot.Online """ SCAddRobot: 消息体 SCAddRobot.robot: Robot """ o2 = protobuf.SCAddRobot() o2.robot.CopyFrom(o1) re_command_id = protobuf.SC_NotifyAdd # 4016 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) # --- 发送返回数据 4007 todo 向请求来的车端,返回设置是否成功的消息 o1 = protobuf.SCAdd() o1.ret = True o1.uid = 112233 o1.name = 'aabbcc' re_command_id = protobuf.SC_Add # 4007 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) elif command_id == protobuf.CS_Offer: # 2004 # --- 接收请求数据 object = protobuf.Offer() object.ParseFromString(body_data) print(f"Received Response: peer={object.peer}") # --- 发送返回数据 o1 = protobuf.Offer() o1.index = object.index o1.peer = object.peer o1.type = object.type o1.sdp = object.sdp re_command_id = protobuf.SC_NotifyOffer # 4012 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) elif command_id == protobuf.CS_Answer: # 2005 # --- 接收请求数据 object = protobuf.Answer() object.ParseFromString(body_data) print(f"Received Response: peer={object.peer}") # --- 发送返回数据 o1 = protobuf.Offer() o1.index = object.index o1.peer = object.peer o1.type = object.type o1.sdp = object.sdp re_command_id = protobuf.SC_NotifyAnswer # 4011 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) elif command_id == protobuf.CS_Candidate: # 2006 # --- 接收请求数据 object = protobuf.Candidate() object.ParseFromString(body_data) print(f"Received Response: peer={object.peer}") # --- 发送返回数据 o1 = protobuf.Offer() o1.index = object.index o1.peer = object.peer o1.type = object.type o1.candidate = object.candidate o1.sdpMLineIndex = object.sdpMLineIndex o1.sdpMid = object.sdpMid o1.egotype = object.egotype re_command_id = protobuf.SC_NotifyCandidate # 4013 re_body_length = o1.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o1.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) elif command_id == protobuf.CS_Req: # 2001 todo 实现收到qt端2001消息,向车端发送4009消息 # --- 接收请求数据 """ CSReq: 消息体 CSReq.peer: int32(uid,用户标识) CSReq.index: int32(相机位置,RenderPosition) CSReq.egotype: int32(终端类型,舱端/车端) """ o1 = protobuf.CSReq() o1.ParseFromString(body_data) methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#peer: {o1.peer}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#index: {o1.index}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#egotype: {o1.egotype}") # --- 发送返回数据 """ CSReq: 消息体 CSReq.peer: int32(uid,用户标识) CSReq.index: int32(相机位置,RenderPosition) CSReq.egotype: int32(终端类型,舱端/车端) """ o2 = protobuf.CSReq() o2.peer = o1.peer o2.index = o1.index o2.type = o1.egotype re_command_id = protobuf.SC_NotifyReq # 4009 re_body_length = o2.ByteSize() re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length) re_body_data = o2.SerializeToString() re_send_data = re_head_data + re_body_data methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}") methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}") self.client.write(re_send_data) def connection_lost(self, exc): print(f"SRIConnection closed by {self.peername}") # def connection_lost(self, exc): # print(f"Connection closed by {self.connection_id}") # if self.connection_id in SRIConnection.clients: # del SRIConnection.clients[self.connection_id] @classmethod async def run(cls): loop = asyncio.get_running_loop() server = await loop.create_server( lambda: SRIConnection(), '0.0.0.0', 20917 ) async with server: print(f"Server listening on 0.0.0.0:20917") methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917") await server.serve_forever() if __name__ == '__main__': asyncio.run(SRIConnection.run()) # asyncio.run(main())