123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
- """
- """
- from hub import methods, Global, protobuf
- from werkzeug.security import check_password_hash
- import struct
- import asyncio
- import time
- import socket
- clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
- serial_rid_dict = {
- '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
- '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
- 'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
- 'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
- 'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004, # 正在使用
- }
- account_uid_dict = {
- 'ego': 3
- }
- # live_relationship = {} # {<id-1>id-2>: True}
- class SRIConnection(asyncio.Protocol):
- """"""
- head_sequence = '<hh' # 字节序规则
- head_size = struct.calcsize(head_sequence)
- message_data = b''
- def __init__(self):
- self.lock = asyncio.Lock() # 初始化锁
- def connection_made(self, client):
- """
- 建立客户端连接
- """
- peername = client.get_extra_info('peername')
- # self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}" # 客户端id
- # self.connection_id = int(f"{peername[1]}") # 客户端id(公网情况)
- self.connection_id = int(f"{peername[0].replace('.', '')}") # 客户端id(局域网情况)
- self.client = client
- # self.data = b''
- self.client_type = None
- self.client_info = None
- self.update_at = methods.now_ts()
- clients[self.connection_id] = self
- # 获取底层 socket
- # sock = self.client.get_extra_info('socket')
- # 查看缓冲区大小
- # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
- # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
- # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
- # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")
- def connection_lost(self, exc):
- """
- 关闭连接
- """
- methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")
- # --- 处理车端掉线,通知所有舱端
- if self.client_type == 'vehicle':
- """
- SCDelRobot: 消息体
- SCDelRobot.peer: int32
- SCDelRobot.egotype: int32
- """
- o2 = protobuf.SCDelRobot()
- o2.peer = self.client_info.get('connection_id') # 车端id
- o2.egotype = self.client_info.get('egotype')
- re_command_id = protobuf.SC_NotifyDel # 4017
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
- for item in clients.values():
- if item.client_type == 'cockpit':
- item.client.write(re_send_data)
- # --- 处理舱端掉线,通知所有车端
- if self.client_type == 'cockpit':
- """
- Leave: 消息体
- Leave.peer: int32(车端rid)
- Leave.egotype: int32
- """
- o2 = protobuf.Leave()
- o2.peer = self.client_info.get('connection_id') # 舱端id
- o2.egotype = 1 # 客户端类型
- re_command_id = protobuf.SC_NotifyLeave # 4014
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
- for item in clients.values():
- if item.client_type == 'vehicle':
- item.client.write(re_send_data)
- # --- clean
- if self.connection_id in clients:
- del clients[self.connection_id]
- # --- clean live_relationship
- # keys = [key for key in live_relationship.keys() if self.connection_id in key]
- # for key in keys:
- # del live_relationship[key]
- def data_received(self, data):
- """
- 消息处理
- """
- asyncio.create_task(self.handle_data(data))
- async def handle_data(self, data):
- """
- 消息处理
- """
- # --- before
- # loop = asyncio.get_running_loop()
- # await loop.run_in_executor(None, self.process_data, data)
- # async with self.lock: # 加入锁,确保只有一个任务在处理数据
- # await self.process_data(data)
- async with self.lock: # 加入锁,确保只有一个任务在处理数据
- loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, self.process_data, data) # 调用非异步函数
- def process_data(self, data):
- """
- 消息处理
- """
- try:
- # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
- self.message_data += data # 执行了这个以后
- # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
- # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
- # methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")
- # count = 0
- while True:
- # --- print
- # count += 1
- # methods.debug_log(f"{self.connection_id}|SRIConnection123",
- # f"---------------------------------- while count: {count}")
- # 确保有足够的字节来处理消息头
- if len(self.message_data) < self.head_size:
- break
- # 获取消息头
- head_data = self.message_data[:self.head_size]
- command_id, body_length = struct.unpack(self.head_sequence, head_data)
- if command_id not in [2008]:
- methods.debug_log(f'{self.connection_id}|SRIConnection176',
- f"command_id: {command_id}, body_length: {body_length}")
- # 检查命令ID是否有效
- if not (1000 < command_id < 9000):
- self.message_data = b'' # 清空无效数据
- break
- # 检查完整消息是否接收完毕
- total_length = self.head_size + body_length
- if len(self.message_data) < total_length:
- break # 数据不完整,等待更多数据
- # 调用相应处理方法
- method = getattr(self, f'message{command_id}', None)
- if method:
- method(self.message_data[self.head_size:total_length]) # 处理完整消息
- else:
- methods.debug_log(f"{self.connection_id}|SRIConnectionError",
- f"No handler for command_id: {command_id}")
- # 移除已处理的消息
- self.message_data = self.message_data[total_length:]
- except Exception as exception:
- methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
- methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")
- def message2008(self, body_data):
- # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
- pass
- def message2009(self, body_data):
- # --- 监听 2009
- """
- CSAdd: 消息体
- CSAdd.serial: string
- CSAdd.type: int32 EgoType::Car | 0 EgoType::None 1 EgoType::User 2 EgoType::Car
- CSAdd.name: string
- """
- object0 = protobuf.CSAdd()
- object0.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection219", f"#serial: {object0.serial}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")
- # --- update ---
- self.client_type = 'vehicle'
- self.client_info = {
- 'connection_id': self.connection_id,
- 'rid': serial_rid_dict.get(object0.serial),
- 'name': object0.name,
- 'serial': object0.serial,
- 'egotype': 2, # 客户端类型
- }
- """
- SCAddRobot: 消息体
- SCAddRobot.robot: Robot
- Robot: 消息体
- Robot.rid: int32
- Robot.name: string
- Robot.type: int32
- Robot.state: RobotState Offline Online Busy
- """
- o1 = protobuf.Robot()
- # o1.rid = self.client_info.get('rid')
- o1.rid = self.client_info.get('connection_id')
- o1.name = object0.name
- o1.type = 2 # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
- o1.state = protobuf.Robot.Online
- o2 = protobuf.SCAddRobot()
- o2.robot.CopyFrom(o1)
- re_command_id = protobuf.SC_NotifyAdd # 4016
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- # --- send 4016 发送全部舱端
- for item in clients.values():
- if item.client_type == 'cockpit':
- methods.debug_log(f"{self.connection_id}|SRIConnection136", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- # --- send 4007 todo 凯强说并未用到
- # o1 = protobuf.SCAdd()
- # o1.ret = True
- # o1.uid = 112233 # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
- # o1.name = object0.name
- # re_command_id = protobuf.SC_Add # 4007
- # re_body_length = o1.ByteSize()
- # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- # re_body_data = o1.SerializeToString()
- # re_send_data = re_head_data + re_body_data
- # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
- # self.client.write(re_send_data)
- def message2000(self, body_data):
- # --- 解析消息体 2000
- object = protobuf.CSSign()
- object.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
- methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")
- # --- update ---
- ret = True
- if object.account == "Ego" and object.password != '123456':
- ret = False
- # --- check ---
- user = Global.mdb.get_one('UserInfo', {'username': object.account})
- name = ''
- uuid = ''
- if not user:
- ret = False
- elif not check_password_hash(user.get('password'), object.password):
- ret = False
- elif user.get('state') and int(user.get('state')) == 1:
- ret = False
- else:
- name = user.get('name')
- uuid = str(user.get('_id'))
- ret = True
- # --- update ---
- if ret:
- self.client_type = 'cockpit'
- self.client_info = {
- 'connection_id': self.connection_id,
- 'uid': 3, # 对应数据库里的ego的id
- 'name': object.account, # 对应数据库里
- 'egotype': 1, # 舱端类型
- 'user_uuid': uuid, # 用户id
- }
- # --- send 4000
- object = protobuf.SCSign()
- object.ret = ret # 返回结果
- object.uid = self.connection_id
- object.name = name # 人员名称
- object.user_uuid = uuid # 人员唯一标识
- re_command_id = protobuf.SC_Sign # 4000
- re_body_length = object.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = object.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
- self.client.write(re_send_data)
- def message2010(self, body_data):
- # --- send 4008 发送全部车端信息列表
- """
- Robot: 消息体
- Robot.rid: string
- Robot.name: string
- Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
- Robot.state: enum Offline Online Busy
- """
- # --- 获取全部在线车辆列表
- o2 = protobuf.SCRobot()
- for item in clients.values():
- if item.client_type and item.client_type == 'vehicle':
- o1 = protobuf.Robot()
- # o1.rid = item.client_info.get('rid')
- o1.rid = item.client_info.get('connection_id')
- o1.name = item.client_info.get('name')
- o1.type = 2 # EgoType::Car
- o1.state = protobuf.Robot.RobotState.Value('Online')
- o2.robot.add().CopyFrom(o1)
- re_command_id = protobuf.SC_Robot # 4008
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection306", f"re_command_id: {re_command_id}")
- self.client.write(re_send_data)
- def message2001(self, body_data):
- # --- 解析消息体
- """
- CSReq: 消息体
- CSReq.peer: int32(rid,车端唯一标识)
- CSReq.index: int32(相机位置,RenderPosition)
- CSReq.egotype: int32(终端类型,舱端/车端)
- """
- o1 = protobuf.CSReq()
- o1.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
- methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#index: {o1.index}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")
- # --- send 4009 指定车端
- for item in clients.values():
- if item.client_info.get('connection_id') == o1.peer:
- """
- CSReq: 消息体
- CSReq.peer: int32(rid,车端唯一标识)
- CSReq.index: int32(相机位置,RenderPosition)
- CSReq.egotype: int32(终端类型,舱端/车端)
- """
- o2 = protobuf.CSReq()
- o2.peer = self.client_info.get('connection_id') # 舱端id
- o2.index = o1.index
- o2.egotype = o1.egotype
- re_command_id = protobuf.SC_NotifyReq # 4009
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- def message2002(self, body_data):
- # --- 解析消息体
- """
- CSRep: 消息体
- CSRep.desc: VideoDesc
- CSRep.peer: int32
- CSRep.index: int32
- CSRep.egotype: int32
- """
- o1 = protobuf.CSRep()
- o1.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")
- # --- send 4010 指定舱端
- for item in clients.values():
- if item.client_info.get('connection_id') == o1.peer:
- o2 = protobuf.CSRep()
- o2.desc = o1.desc
- o2.peer = self.client_info.get('connection_id') # 车端id
- o2.index = o1.index
- o2.egotype = o1.egotype
- re_command_id = protobuf.SC_NotifyRep # 4010
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- def message2004(self, body_data):
- # --- 解析消息体
- """
- Offer: 消息体
- Offer.index: int32
- Offer.peer: int32(车端rid)
- Offer.type: string
- Offer.sdp: string
- """
- object = protobuf.Offer()
- object.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")
- # --- send 4012 指定车端
- for item in clients.values():
- if item.client_info.get('connection_id') == object.peer:
- o1 = protobuf.Offer()
- o1.index = object.index
- o1.peer = self.client_info.get('connection_id') # 舱端id
- o1.type = object.type
- o1.sdp = object.sdp
- re_command_id = protobuf.SC_NotifyOffer # 4012
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- def message2005(self, body_data):
- # --- 解析消息体
- """
- Answer: 消息体
- Answer.index: int32
- Answer.peer: int32(舱端uid)
- Answer.type: string
- Answer.sdp: string
- """
- object = protobuf.Answer()
- object.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")
- # --- send 4011 指定舱端
- for item in clients.values():
- if item.client_info.get('connection_id') == object.peer:
- o1 = protobuf.Answer()
- o1.index = object.index
- o1.peer = self.client_info.get('connection_id') # 车端id
- o1.type = object.type
- o1.sdp = object.sdp
- re_command_id = protobuf.SC_NotifyAnswer # 4011
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{self.connection_id}|SRIConnection428", f"#index: {o1.index}")
- item.client.write(re_send_data)
- def message2006(self, body_data):
- # --- 解析消息体
- """
- Candidate: 消息体
- Candidate.index: int32
- Candidate.peer: int32(车端rid)
- Candidate.type: string
- Candidate.candidate: string
- Candidate.sdpMLineIndex: int32
- Candidate.sdpMid: string
- Candidate.egotype: int32
- """
- object = protobuf.Candidate()
- object.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")
- for item in clients.values():
- # --- send 4013 舱端到车端
- if item.client_info.get('connection_id') == object.peer:
- o1 = protobuf.Candidate()
- o1.index = object.index
- o1.peer = self.client_info.get('connection_id') # 舱端id,
- o1.type = object.type
- o1.candidate = object.candidate
- o1.sdpMLineIndex = object.sdpMLineIndex
- o1.sdpMid = object.sdpMid
- o1.egotype = object.egotype
- re_command_id = protobuf.SC_NotifyCandidate # 4013
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- # --- send 4013 车端到舱端
- if item.client_info.get('connection_id') == object.peer:
- o1 = protobuf.Candidate()
- o1.index = object.index
- o1.peer = self.client_info.get('connection_id') # 车端id,
- o1.type = object.type
- o1.candidate = object.candidate
- o1.sdpMLineIndex = object.sdpMLineIndex
- o1.sdpMid = object.sdpMid
- o1.egotype = object.egotype
- re_command_id = protobuf.SC_NotifyCandidate # 4013
- re_body_length = o1.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o1.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
- # methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
- item.client.write(re_send_data)
- def message2007(self, body_data):
- # --- 解析消息体
- """
- Leave: 消息体
- Leave.peer: int32(车端rid)
- Leave.egotype: int32
- """
- o1 = protobuf.Leave()
- o1.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")
- # --- send 4009 指定车端
- for item in clients.values():
- if item.client_info.get('connection_id') == o1.peer:
- """
- Leave: 消息体
- Leave.peer: int32(车端rid)
- Leave.egotype: int32
- """
- o2 = protobuf.Leave()
- o2.peer = self.client_info.get('connection_id') # 舱端id
- o2.egotype = o1.egotype
- re_command_id = protobuf.SC_NotifyLeave # 4014
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- def message2014(self, body_data):
- # --- 解析消息体 2014
- """
- message CSState
- {
- UserState state=1;
- int32 uid=2;
- };
- """
- o1 = protobuf.CSState()
- o1.ParseFromString(body_data)
- methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#state: {o1.state}")
- methods.debug_log(f"{self.connection_id}|SRIConnection|572", f"#uid: {o1.uid}")
- # --- send 4016 发送全部舱端
- for item in clients.values():
- if item.client_type == 'cockpit':
- """
- message SCState
- {
- UserState state=1;
- int32 uid=2;
- };
- """
- o2 = protobuf.SCState()
- o2.state = o1.state
- o2.uid = o1.uid # 车端id
- re_command_id = protobuf.SC_State # 4022
- re_body_length = o2.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o2.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- # --- send 6011 指发送车端操作用户的id
- for item in clients.values():
- if item.client_info.get('connection_id') == o1.uid:
- """
- Leave: 消息体
- Leave.peer: int32(车端rid)
- Leave.egotype: int32
- """
- o3 = protobuf.UserActivityInfo()
- o3.user_uuid = self.client_info.get('user_uuid') # 用户uuid
- o3.cockpit_id = self.connection_id # 舱端id
- o3.vehicle_id = o1.uid # 车端id
- re_command_id = protobuf.S2V_SendUserInfo # 6011
- re_body_length = o3.ByteSize()
- re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
- re_body_data = o3.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{self.connection_id}|SRIConnection611", f"re_command_id: {re_command_id}")
- item.client.write(re_send_data)
- @staticmethod
- async def check_clients():
- """
- 剔除掉线连接
- """
- count = 0
- while True:
- count += 1
- print(f"#count: {count}", flush=True)
- now_at = methods.now_ts()
- for connection_id in list(clients.keys()):
- object = clients.get(connection_id)
- print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
- flush=True)
- await asyncio.sleep(3) # 发送消息的间隔时间
- @staticmethod
- async def run():
- """
- """
- # --- define ---
- loop = asyncio.get_running_loop()
- server = await loop.create_server(
- lambda: SRIConnection(),
- '0.0.0.0', Global.egoserver_port
- )
- # --- start ---
- async with server:
- print(f"Server listening on 0.0.0.0:{Global.egoserver_port}", flush=True)
- # await loop.create_task(SRIConnection.check_clients())
- await server.serve_forever()
- if __name__ == '__main__':
- asyncio.run(SRIConnection.run())
|