|
@@ -0,0 +1,628 @@
|
|
|
+"""
|
|
|
+"""
|
|
|
+# from hub import methods, Global
|
|
|
+
|
|
|
+import struct
|
|
|
+import asyncio
|
|
|
+import time
|
|
|
+import socket
|
|
|
+
|
|
|
+import sys
|
|
|
+import importlib
|
|
|
+
|
|
|
+# --- for linux
|
|
|
+sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
|
|
|
+sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-pysdk')
|
|
|
+
|
|
|
+protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
|
|
|
+methods = importlib.import_module(f"xlib")
|
|
|
+clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
|
|
|
+serial_rid_dict = {
|
|
|
+ '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
|
|
|
+ '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
|
|
|
+ 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
|
|
|
+ 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
|
|
|
+ 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
|
|
|
+}
|
|
|
+account_uid_dict = {
|
|
|
+ 'ego': 3
|
|
|
+}
|
|
|
+# live_relationship = {} # {<id-1>id-2>: True}
|
|
|
+
|
|
|
+
|
|
|
+class SRIConnection(asyncio.Protocol):
|
|
|
+ """"""
|
|
|
+
|
|
|
+ head_sequence = '<hh' # 字节序规则
|
|
|
+ head_size = struct.calcsize(head_sequence)
|
|
|
+ message_data = b''
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.lock = asyncio.Lock() # 初始化锁
|
|
|
+
|
|
|
+ def connection_made(self, client):
|
|
|
+ """
|
|
|
+ 建立客户端连接
|
|
|
+ """
|
|
|
+ peername = client.get_extra_info('peername')
|
|
|
+ # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 12700130000 客户端id
|
|
|
+ self.connection_id = int(f"{peername[0].replace('.', '')}")
|
|
|
+ self.client = client
|
|
|
+ # self.data = b''
|
|
|
+ self.client_type = None
|
|
|
+ self.client_info = None
|
|
|
+ self.update_at = methods.now_ts()
|
|
|
+ clients[self.connection_id] = self
|
|
|
+
|
|
|
+ # 获取底层 socket
|
|
|
+ # sock = self.client.get_extra_info('socket')
|
|
|
+
|
|
|
+ # 查看缓冲区大小
|
|
|
+ # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
|
|
|
+ # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
|
|
|
+ # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
|
|
|
+ # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
|
|
|
+
|
|
|
+ def connection_lost(self, exc):
|
|
|
+ """
|
|
|
+ 关闭连接
|
|
|
+ """
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
|
|
|
+
|
|
|
+ # --- 处理车端掉线,通知所有舱端
|
|
|
+ if self.client_type == 'vehicle':
|
|
|
+ """
|
|
|
+ SCDelRobot: 消息体
|
|
|
+ SCDelRobot.peer: int32
|
|
|
+ SCDelRobot.egotype: int32
|
|
|
+ """
|
|
|
+ o2 = protobuf.SCDelRobot()
|
|
|
+ o2.peer = self.client_info.get('connection_id') # 车端id
|
|
|
+ o2.egotype = self.client_info.get('egotype')
|
|
|
+ re_command_id = protobuf.SC_NotifyDel # 4017
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_type == 'cockpit':
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ # --- 处理舱端掉线,通知所有车端
|
|
|
+ if self.client_type == 'cockpit':
|
|
|
+ """
|
|
|
+ Leave: 消息体
|
|
|
+ Leave.peer: int32(车端rid)
|
|
|
+ Leave.egotype: int32
|
|
|
+ """
|
|
|
+ o2 = protobuf.Leave()
|
|
|
+ o2.peer = self.client_info.get('connection_id') # 舱端id
|
|
|
+ o2.egotype = 1 # 客户端类型
|
|
|
+ re_command_id = protobuf.SC_NotifyLeave # 4014
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_type == 'vehicle':
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ # --- clean
|
|
|
+ if self.connection_id in clients:
|
|
|
+ del clients[self.connection_id]
|
|
|
+
|
|
|
+ # --- clean live_relationship
|
|
|
+ # keys = [key for key in live_relationship.keys() if self.connection_id in key]
|
|
|
+ # for key in keys:
|
|
|
+ # del live_relationship[key]
|
|
|
+
|
|
|
+ def data_received(self, data):
|
|
|
+ """
|
|
|
+ 消息处理
|
|
|
+ """
|
|
|
+ asyncio.create_task(self.handle_data(data))
|
|
|
+
|
|
|
+ async def handle_data(self, data):
|
|
|
+ """
|
|
|
+ 消息处理
|
|
|
+ """
|
|
|
+ # --- before
|
|
|
+ # loop = asyncio.get_running_loop()
|
|
|
+ # await loop.run_in_executor(None, self.process_data, data)
|
|
|
+
|
|
|
+ # async with self.lock: # 加入锁,确保只有一个任务在处理数据
|
|
|
+ # await self.process_data(data)
|
|
|
+
|
|
|
+ async with self.lock: # 加入锁,确保只有一个任务在处理数据
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
|
|
|
+
|
|
|
+ def process_data(self, data):
|
|
|
+ """
|
|
|
+ 消息处理
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
|
|
|
+ self.message_data += data # 执行了这个以后
|
|
|
+
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
|
|
|
+
|
|
|
+ # count = 0
|
|
|
+ while True:
|
|
|
+
|
|
|
+ # --- print
|
|
|
+ # count += 1
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection123",
|
|
|
+ # f"---------------------------------- while count: {count}")
|
|
|
+
|
|
|
+ # 确保有足够的字节来处理消息头
|
|
|
+ if len(self.message_data) < self.head_size:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 获取消息头
|
|
|
+ head_data = self.message_data[:self.head_size]
|
|
|
+ command_id, body_length = struct.unpack(self.head_sequence, head_data)
|
|
|
+ methods.debug_log(f'{self.connection_id}|SRIConnection130',
|
|
|
+ f"command_id: {command_id}, body_length: {body_length}")
|
|
|
+
|
|
|
+ # 检查命令ID是否有效
|
|
|
+ if not (2000 <= command_id < 5000):
|
|
|
+ self.message_data = b'' # 清空无效数据
|
|
|
+ break
|
|
|
+
|
|
|
+ # 检查完整消息是否接收完毕
|
|
|
+ total_length = self.head_size + body_length
|
|
|
+ if len(self.message_data) < total_length:
|
|
|
+ break # 数据不完整,等待更多数据
|
|
|
+
|
|
|
+ # 调用相应处理方法
|
|
|
+ method = getattr(self, f'message{command_id}', None)
|
|
|
+ if method:
|
|
|
+ method(self.message_data[self.head_size:total_length]) # 处理完整消息
|
|
|
+ else:
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnectionError",
|
|
|
+ f"No handler for command_id: {command_id}")
|
|
|
+
|
|
|
+ # 移除已处理的消息
|
|
|
+ self.message_data = self.message_data[total_length:]
|
|
|
+
|
|
|
+ except Exception as exception:
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
|
|
|
+
|
|
|
+ def message2008(self, body_data):
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
|
|
|
+ pass
|
|
|
+
|
|
|
+ def message2009(self, body_data):
|
|
|
+
|
|
|
+ # --- 监听 2009
|
|
|
+ """
|
|
|
+ CSAdd: 消息体
|
|
|
+ CSAdd.serial: string
|
|
|
+ CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
|
|
|
+ CSAdd.name: string
|
|
|
+ """
|
|
|
+ object0 = protobuf.CSAdd()
|
|
|
+ object0.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
|
|
|
+
|
|
|
+ # --- update ---
|
|
|
+ self.client_type = 'vehicle'
|
|
|
+ self.client_info = {
|
|
|
+ 'connection_id': self.connection_id,
|
|
|
+ 'rid': serial_rid_dict.get(object0.serial),
|
|
|
+ 'name': object0.name,
|
|
|
+ 'serial': object0.serial,
|
|
|
+ 'egotype': 2, # 客户端类型
|
|
|
+ }
|
|
|
+
|
|
|
+ """
|
|
|
+ SCAddRobot: 消息体
|
|
|
+ SCAddRobot.robot: Robot
|
|
|
+ Robot: 消息体
|
|
|
+ Robot.rid: int32
|
|
|
+ Robot.name: string
|
|
|
+ Robot.type: int32
|
|
|
+ Robot.state: RobotState Offline Online Busy
|
|
|
+ """
|
|
|
+ o1 = protobuf.Robot()
|
|
|
+ # o1.rid = self.client_info.get('rid')
|
|
|
+ o1.rid = self.client_info.get('connection_id')
|
|
|
+ o1.name = object0.name
|
|
|
+ o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
|
|
|
+ o1.state = protobuf.Robot.Online
|
|
|
+ o2 = protobuf.SCAddRobot()
|
|
|
+ o2.robot.CopyFrom(o1)
|
|
|
+ re_command_id = protobuf.SC_NotifyAdd # 4016
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+
|
|
|
+ # --- send 4016 发送全部舱端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_type == 'cockpit':
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ # --- send 4007 todo 凯强说并未用到
|
|
|
+ # o1 = protobuf.SCAdd()
|
|
|
+ # o1.ret = True
|
|
|
+ # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
|
|
|
+ # o1.name = object0.name
|
|
|
+ # re_command_id = protobuf.SC_Add # 4007
|
|
|
+ # re_body_length = o1.ByteSize()
|
|
|
+ # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ # re_body_data = o1.SerializeToString()
|
|
|
+ # re_send_data = re_head_data + re_body_data
|
|
|
+ # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
|
|
|
+ # self.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2000(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ object = protobuf.CSSign()
|
|
|
+ object.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
|
|
|
+
|
|
|
+ # --- update ---
|
|
|
+ self.client_type = 'cockpit'
|
|
|
+ self.client_info = {
|
|
|
+ 'connection_id': self.connection_id,
|
|
|
+ 'uid': 3, # 对应数据库里的ego的id
|
|
|
+ 'name': 'ego', # 对应数据库里
|
|
|
+ 'egotype': 1, # 舱端类型
|
|
|
+ }
|
|
|
+
|
|
|
+ # --- send 4000
|
|
|
+ object = protobuf.SCSign()
|
|
|
+ object.ret = True
|
|
|
+ # object.uid = self.client_info.get('uid')
|
|
|
+ object.uid = self.client_info.get('connection_id')
|
|
|
+ object.name = self.client_info.get('name')
|
|
|
+ re_command_id = protobuf.SC_Sign # 4000
|
|
|
+ re_body_length = object.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = object.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
|
|
|
+ self.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2010(self, body_data):
|
|
|
+
|
|
|
+ # --- send 4008 发送全部车端信息列表
|
|
|
+ """
|
|
|
+ Robot: 消息体
|
|
|
+ Robot.rid: string
|
|
|
+ Robot.name: string
|
|
|
+ Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
|
|
|
+ Robot.state: enum Offline Online Busy
|
|
|
+ """
|
|
|
+ # --- 获取全部在线车辆列表
|
|
|
+ o2 = protobuf.SCRobot()
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_type and item.client_type == 'vehicle':
|
|
|
+ o1 = protobuf.Robot()
|
|
|
+ # o1.rid = item.client_info.get('rid')
|
|
|
+ o1.rid = item.client_info.get('connection_id')
|
|
|
+ o1.name = item.client_info.get('name')
|
|
|
+ o1.type = 2 # EgoType::Car
|
|
|
+ o1.state = protobuf.Robot.RobotState.Value('Online')
|
|
|
+ o2.robot.add().CopyFrom(o1)
|
|
|
+
|
|
|
+ re_command_id = protobuf.SC_Robot # 4008
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
|
|
|
+ self.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2001(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ CSReq: 消息体
|
|
|
+ CSReq.peer: int32(rid,车端唯一标识)
|
|
|
+ CSReq.index: int32(相机位置,RenderPosition)
|
|
|
+ CSReq.egotype: int32(终端类型,舱端/车端)
|
|
|
+ """
|
|
|
+ o1 = protobuf.CSReq()
|
|
|
+ o1.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
|
|
|
+
|
|
|
+ # --- send 4009 指定车端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_info.get('connection_id') == o1.peer:
|
|
|
+ """
|
|
|
+ CSReq: 消息体
|
|
|
+ CSReq.peer: int32(rid,车端唯一标识)
|
|
|
+ CSReq.index: int32(相机位置,RenderPosition)
|
|
|
+ CSReq.egotype: int32(终端类型,舱端/车端)
|
|
|
+ """
|
|
|
+ o2 = protobuf.CSReq()
|
|
|
+ o2.peer = self.client_info.get('connection_id') # 舱端id
|
|
|
+ o2.index = o1.index
|
|
|
+ o2.egotype = o1.egotype
|
|
|
+ re_command_id = protobuf.SC_NotifyReq # 4009
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2002(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ CSRep: 消息体
|
|
|
+ CSRep.desc: VideoDesc
|
|
|
+ CSRep.peer: int32
|
|
|
+ CSRep.index: int32
|
|
|
+ CSRep.egotype: int32
|
|
|
+ """
|
|
|
+ o1 = protobuf.CSRep()
|
|
|
+ o1.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
|
|
|
+
|
|
|
+ # --- send 4010 指定舱端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_info.get('connection_id') == o1.peer:
|
|
|
+ o2 = protobuf.CSRep()
|
|
|
+ o2.desc = o1.desc
|
|
|
+ o2.peer = self.client_info.get('connection_id') # 车端id
|
|
|
+ o2.index = o1.index
|
|
|
+ o2.egotype = o1.egotype
|
|
|
+ re_command_id = protobuf.SC_NotifyRep # 4010
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2004(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ Offer: 消息体
|
|
|
+ Offer.index: int32
|
|
|
+ Offer.peer: int32(车端rid)
|
|
|
+ Offer.type: string
|
|
|
+ Offer.sdp: string
|
|
|
+ """
|
|
|
+ object = protobuf.Offer()
|
|
|
+ object.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
|
|
|
+
|
|
|
+ # --- send 4012 指定车端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_info.get('connection_id') == object.peer:
|
|
|
+ o1 = protobuf.Offer()
|
|
|
+ o1.index = object.index
|
|
|
+ o1.peer = self.client_info.get('connection_id') # 舱端id
|
|
|
+ o1.type = object.type
|
|
|
+ o1.sdp = object.sdp
|
|
|
+ re_command_id = protobuf.SC_NotifyOffer # 4012
|
|
|
+ re_body_length = o1.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o1.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2005(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ Answer: 消息体
|
|
|
+ Answer.index: int32
|
|
|
+ Answer.peer: int32(舱端uid)
|
|
|
+ Answer.type: string
|
|
|
+ Answer.sdp: string
|
|
|
+ """
|
|
|
+ object = protobuf.Answer()
|
|
|
+ object.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
|
|
|
+
|
|
|
+ # --- send 4011 指定舱端
|
|
|
+ for item in clients.values():
|
|
|
+
|
|
|
+ if item.client_info.get('connection_id') == object.peer:
|
|
|
+ o1 = protobuf.Answer()
|
|
|
+ o1.index = object.index
|
|
|
+ o1.peer = self.client_info.get('connection_id') # 车端id
|
|
|
+ o1.type = object.type
|
|
|
+ o1.sdp = object.sdp
|
|
|
+ re_command_id = protobuf.SC_NotifyAnswer # 4011
|
|
|
+ re_body_length = o1.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o1.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2006(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ Candidate: 消息体
|
|
|
+ Candidate.index: int32
|
|
|
+ Candidate.peer: int32(车端rid)
|
|
|
+ Candidate.type: string
|
|
|
+ Candidate.candidate: string
|
|
|
+ Candidate.sdpMLineIndex: int32
|
|
|
+ Candidate.sdpMid: string
|
|
|
+ Candidate.egotype: int32
|
|
|
+ """
|
|
|
+ object = protobuf.Candidate()
|
|
|
+ object.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
|
|
|
+
|
|
|
+ for item in clients.values():
|
|
|
+
|
|
|
+ # --- send 4013 舱端到车端
|
|
|
+ if item.client_info.get('connection_id') == object.peer:
|
|
|
+ o1 = protobuf.Candidate()
|
|
|
+ o1.index = object.index
|
|
|
+ o1.peer = self.client_info.get('connection_id') # 舱端id,
|
|
|
+ o1.type = object.type
|
|
|
+ o1.candidate = object.candidate
|
|
|
+ o1.sdpMLineIndex = object.sdpMLineIndex
|
|
|
+ o1.sdpMid = object.sdpMid
|
|
|
+ o1.egotype = object.egotype
|
|
|
+ re_command_id = protobuf.SC_NotifyCandidate # 4013
|
|
|
+ re_body_length = o1.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o1.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ # --- send 4013 车端到舱端
|
|
|
+ if item.client_info.get('connection_id') == object.peer:
|
|
|
+ o1 = protobuf.Candidate()
|
|
|
+ o1.index = object.index
|
|
|
+ o1.peer = self.client_info.get('connection_id') # 车端id,
|
|
|
+ o1.type = object.type
|
|
|
+ o1.candidate = object.candidate
|
|
|
+ o1.sdpMLineIndex = object.sdpMLineIndex
|
|
|
+ o1.sdpMid = object.sdpMid
|
|
|
+ o1.egotype = object.egotype
|
|
|
+ re_command_id = protobuf.SC_NotifyCandidate # 4013
|
|
|
+ re_body_length = o1.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o1.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
|
|
|
+ # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2007(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ Leave: 消息体
|
|
|
+ Leave.peer: int32(车端rid)
|
|
|
+ Leave.egotype: int32
|
|
|
+ """
|
|
|
+ o1 = protobuf.Leave()
|
|
|
+ o1.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
|
|
|
+
|
|
|
+ # --- send 4009 指定车端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_info.get('connection_id') == o1.peer:
|
|
|
+ """
|
|
|
+ Leave: 消息体
|
|
|
+ Leave.peer: int32(车端rid)
|
|
|
+ Leave.egotype: int32
|
|
|
+ """
|
|
|
+ o2 = protobuf.Leave()
|
|
|
+ o2.peer = self.client_info.get('connection_id') # 舱端id
|
|
|
+ o2.egotype = o1.egotype
|
|
|
+ re_command_id = protobuf.SC_NotifyLeave # 4014
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ def message2014(self, body_data):
|
|
|
+
|
|
|
+ # --- 解析消息体
|
|
|
+ """
|
|
|
+ message CSState
|
|
|
+ {
|
|
|
+ UserState state=1;
|
|
|
+ int32 uid=2;
|
|
|
+ };
|
|
|
+ """
|
|
|
+ o1 = protobuf.CSState()
|
|
|
+ o1.ParseFromString(body_data)
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection530", f"----------------- #state: {o1.state}")
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}")
|
|
|
+
|
|
|
+ # --- send 4016 发送全部舱端
|
|
|
+ for item in clients.values():
|
|
|
+ if item.client_type == 'cockpit':
|
|
|
+ """
|
|
|
+ message SCState
|
|
|
+ {
|
|
|
+ UserState state=1;
|
|
|
+ int32 uid=2;
|
|
|
+ };
|
|
|
+ """
|
|
|
+ o2 = protobuf.SCState()
|
|
|
+ o2.state = o1.state
|
|
|
+ # o2.uid = self.client_info.get('uid')
|
|
|
+ o2.uid = o1.uid # todo 文磊这边发送过来的车端id
|
|
|
+ re_command_id = protobuf.SC_State # 4022
|
|
|
+ re_body_length = o2.ByteSize()
|
|
|
+ re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
|
|
|
+ re_body_data = o2.SerializeToString()
|
|
|
+ re_send_data = re_head_data + re_body_data
|
|
|
+ methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
|
|
|
+ item.client.write(re_send_data)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ async def check_clients():
|
|
|
+ """
|
|
|
+ 剔除掉线连接
|
|
|
+ """
|
|
|
+ count = 0
|
|
|
+ while True:
|
|
|
+ count += 1
|
|
|
+ print(f"#count: {count}", flush=True)
|
|
|
+
|
|
|
+ now_at = methods.now_ts()
|
|
|
+
|
|
|
+ for connection_id in list(clients.keys()):
|
|
|
+ object = clients.get(connection_id)
|
|
|
+ print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
|
|
|
+ flush=True)
|
|
|
+
|
|
|
+ await asyncio.sleep(3) # 发送消息的间隔时间
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ async def run():
|
|
|
+ """
|
|
|
+ """
|
|
|
+ # --- define ---
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ server = await loop.create_server(
|
|
|
+ lambda: SRIConnection(),
|
|
|
+ '0.0.0.0', 20917
|
|
|
+ )
|
|
|
+
|
|
|
+ # --- start ---
|
|
|
+ async with server:
|
|
|
+ print("Server listening on 0.0.0.0:20917", flush=True)
|
|
|
+ # await loop.create_task(SRIConnection.check_clients())
|
|
|
+ await server.serve_forever()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ asyncio.run(SRIConnection.run())
|