123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- """
- todo
- 1、实现 self.clients 循环检查逻辑
- """
- # from hub import methods, Global
- import struct
- import asyncio
- import time
- # from concurrent.futures import ThreadPoolExecutor
- import sys
- import importlib
- # --- for linux
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
- protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
- methods = importlib.import_module(f"xlib")
- class SRIConnection(asyncio.Protocol):
- """"""
- head_sequence = '<hh' # 字节序规则
- # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
- # executor = ThreadPoolExecutor()
- clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
- def connection_made(self, client):
- self.client = client
- self.peername = client.get_extra_info('peername')
- print(f"Connection from {self.peername}")
- # --- 自定义参数
- self.head_sequence = '<hh' # 字节序规则
- self.head_size = struct.calcsize(self.head_sequence)
- self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}" # 12700130000
- # --- fill ---
- self.clients[self.connection_id] = client, methods.now_ts(), 'type=vehicle'
- def data_received(self, data):
- asyncio.create_task(self.handle_data(data))
- async def handle_data(self, data):
- loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, self.process_data, data)
- def process_data(self, data):
- head_data = data[:self.head_size]
- body_data = data[self.head_size:]
- command_id, body_length = struct.unpack(self.head_sequence, head_data)
- methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}")
- if command_id == protobuf.CS_KeepAlive: # 2008
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None")
- methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None")
- def connection_lost(self, exc):
- print(f"Connection closed by {self.client_info}")
- if self.client_info in CustomProtocol.clients:
- del CustomProtocol.clients[self.client_info]
- self.on_disconnect()
- # def connection_lost(self, exc):
- # print(f"Connection closed by {self.connection_id}")
- # if self.connection_id in SRIConnection.clients:
- # del SRIConnection.clients[self.connection_id]
- @classmethod
- async def check_clients(cls):
- count = 0
- while True:
- count += 1
- print(f"#count: {count}", flush=True)
- # for connection_id, (client, update_at, client_type) in cls.clients.items():
- for connection_id in list(cls.clients.keys()):
- client, update_at, client_type = cls.clients.get(connection_id)
- print(f"#connection_id: {connection_id}, #update_at: {update_at}, #client_type: {client_type}",
- flush=True)
- # 创建并发送消息
- object = protobuf.SCAdd()
- object.ret = True
- object.uid = 112233
- object.name = 'periodic_message'
- re_command_id = protobuf.SC_Add # 4007
- re_body_length = object.ByteSize()
- re_head_data = struct.pack(cls.head_sequence, re_command_id, re_body_length)
- re_body_data = object.SerializeToString()
- re_send_data = re_head_data + re_body_data
- methods.debug_log(f"{connection_id} | SRIConnection.periodic", f"re_command_id: {re_command_id}")
- methods.debug_log(f"{connection_id} | SRIConnection.periodic", f"re_send_data: {re_send_data}")
- client.write(re_send_data)
- await asyncio.sleep(3) # 发送消息的间隔时间
- # @classmethod
- # async def check_clients(cls):
- # while True:
- # print("Checking client connections:", flush=True)
- # for client_info in list(cls.clients.keys()):
- # methods.debug_log(f"SRIConnection.221", f"Client {client_info} is connected")
- # print(f"Client {client_info} is connected", flush=True)
- # await asyncio.sleep(3)
- @classmethod
- async def run(cls):
- loop = asyncio.get_running_loop()
- server = await loop.create_server(
- lambda: SRIConnection(),
- '0.0.0.0', 20917
- )
- # asyncio.create_task(SRIConnection.check_clients())
- # await loop.create_task(SRIConnection.check_clients())
- async with server:
- print("Server listening on 0.0.0.0:20917", flush=True)
- methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917")
- await loop.create_task(SRIConnection.check_clients())
- await server.serve_forever()
- # await send_task # 确保任务在 server 关闭后也继续执行
- if __name__ == '__main__':
- asyncio.run(SRIConnection.run())
- # asyncio.run(main())
|