123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- # from hub import methods, Global
- import struct
- import asyncio
- import time
- # from concurrent.futures import ThreadPoolExecutor
- import sys
- import importlib
- # --- for linux
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
- protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
- methods = importlib.import_module(f"xlib")
- class SRIConnection(asyncio.Protocol):
- """"""
- # head_sequence = '<hh' # 字节序规则
- # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
- # executor = ThreadPoolExecutor()
- clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
- def connection_made(self, client):
- self.client = client
- self.peername = client.get_extra_info('peername')
- print(f"Connection from {self.peername}")
- # --- 自定义参数
- self.head_sequence = '<hh' # 字节序规则
- self.head_size = struct.calcsize(self.head_sequence)
- self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}" # 12700130000
- # --- fill ---
- self.clients[self.connection_id] = client, methods.now_ts(), 'type=vehicle'
- def data_received(self, data):
- asyncio.create_task(self.handle_data(data))
- async def handle_data(self, data):
- loop = asyncio.get_running_loop()
- # 全局的 ThreadPoolExecutor,由 asyncio 库自动创建和管理。它的线程池大小默认是机器的CPU核心数乘以5
- await loop.run_in_executor(None, self.process_data, data)
- # 自定义的 ThreadPoolExecutor
- # await loop.run_in_executor(SRIConnection.executor, self.process_data, data)
- # result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data)
- # if result:
- # self.client.write(result)
- def process_data(self, data):
- head_data = data[:self.head_size]
- body_data = data[self.head_size:]
- command_id, body_length = struct.unpack(self.head_sequence, head_data)
- methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}")
- if command_id == protobuf.CS_KeepAlive: # 2008
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None")
- elif command_id == protobuf.CS_Sign: # 2000
- # --- 接收请求数据
- object = protobuf.CSSign()
- object.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id} | SRIConnection.100",
- f"Received Response: account={object.account}, password={object.password}")
- # --- 发送返回数据
- object = protobuf.SCSign()
- object.ret = True
- object.uid = 112233
- object.cid = 223344
- object.name = 'aabbcc'
- re_command_id = protobuf.SC_Sign # 4000
- re_body_length = object.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = object.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- # --- test ---
- # for _ in range(10):
- # for i in range(3)[::-1]:
- # methods.debug_log(f"{self.connection_id} | SRIConnection.92", f"sleep {i}s!")
- # time.sleep(1)
- # methods.debug_log(f"{self.connection_id} | SRIConnection.94", f"send re_send_data")
- # self.client.write(re_send_data)
- elif command_id == protobuf.CS_Add: # 2009
- # --- 接收请求数据 2009
- """
- CSAdd: 消息体
- CSAdd.serial: string
- CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
- CSAdd.name: string
- """
- object0 = protobuf.CSAdd()
- object0.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#serial: {object0.serial}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.105", f"#name: {object0.name}")
- # --- 发送返回数据 4016 todo 向所有 EgoType::User 也就是舱端发送消息
- """
- Robot: 消息体
- Robot.rid: int32
- Robot.name: string
- Robot.type: int32
- Robot.state: RobotState Offline Online Busy
- """
- o1 = protobuf.Robot()
- o1.rid = 112233
- o1.name = "aabbcc"
- o1.type = 123
- o1.state = protobuf.Robot.Online
- """
- SCAddRobot: 消息体
- SCAddRobot.robot: Robot
- """
- o2 = protobuf.SCAddRobot()
- o2.robot.CopyFrom(o1)
- re_command_id = protobuf.SC_NotifyAdd # 4016
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- # --- 发送返回数据 4007 todo 向请求来的车端,返回设置是否成功的消息
- o1 = protobuf.SCAdd()
- o1.ret = True
- o1.uid = 112233
- o1.name = 'aabbcc'
- re_command_id = protobuf.SC_Add # 4007
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- elif command_id == protobuf.CS_Offer: # 2004
- # --- 接收请求数据
- object = protobuf.Offer()
- object.ParseFromString(body_data)
- print(f"Received Response: peer={object.peer}")
- # --- 发送返回数据
- o1 = protobuf.Offer()
- o1.index = object.index
- o1.peer = object.peer
- o1.type = object.type
- o1.sdp = object.sdp
- re_command_id = protobuf.SC_NotifyOffer # 4012
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- elif command_id == protobuf.CS_Answer: # 2005
- # --- 接收请求数据
- object = protobuf.Answer()
- object.ParseFromString(body_data)
- print(f"Received Response: peer={object.peer}")
- # --- 发送返回数据
- o1 = protobuf.Offer()
- o1.index = object.index
- o1.peer = object.peer
- o1.type = object.type
- o1.sdp = object.sdp
- re_command_id = protobuf.SC_NotifyAnswer # 4011
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- elif command_id == protobuf.CS_Candidate: # 2006
- # --- 接收请求数据
- object = protobuf.Candidate()
- object.ParseFromString(body_data)
- print(f"Received Response: peer={object.peer}")
- # --- 发送返回数据
- o1 = protobuf.Offer()
- o1.index = object.index
- o1.peer = object.peer
- o1.type = object.type
- o1.candidate = object.candidate
- o1.sdpMLineIndex = object.sdpMLineIndex
- o1.sdpMid = object.sdpMid
- o1.egotype = object.egotype
- re_command_id = protobuf.SC_NotifyCandidate # 4013
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- elif command_id == protobuf.CS_Req: # 2001 todo 实现收到qt端2001消息,向车端发送4009消息
- # --- 接收请求数据
- """
- CSReq: 消息体
- CSReq.peer: int32(uid,用户标识)
- CSReq.index: int32(相机位置,RenderPosition)
- CSReq.egotype: int32(终端类型,舱端/车端)
- """
- o1 = protobuf.CSReq()
- o1.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#peer: {o1.peer}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#index: {o1.index}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"#egotype: {o1.egotype}")
- # --- 发送返回数据
- """
- CSReq: 消息体
- CSReq.peer: int32(uid,用户标识)
- CSReq.index: int32(相机位置,RenderPosition)
- CSReq.egotype: int32(终端类型,舱端/车端)
- """
- o2 = protobuf.CSReq()
- o2.peer = o1.peer
- o2.index = o1.index
- o2.type = o1.egotype
- re_command_id = protobuf.SC_NotifyReq # 4009
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
- self.client.write(re_send_data)
- def connection_lost(self, exc):
- print(f"SRIConnection closed by {self.peername}")
- # def connection_lost(self, exc):
- # print(f"Connection closed by {self.connection_id}")
- # if self.connection_id in SRIConnection.clients:
- # del SRIConnection.clients[self.connection_id]
- @classmethod
- async def run(cls):
- loop = asyncio.get_running_loop()
- server = await loop.create_server(
- lambda: SRIConnection(),
- '0.0.0.0', 20917
- )
- async with server:
- print(f"Server listening on 0.0.0.0:20917")
- methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917")
- await server.serve_forever()
- if __name__ == '__main__':
- asyncio.run(SRIConnection.run())
- # asyncio.run(main())
|