Connection_c1.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. todo
  3. 1、实现 self.clients 循环检查逻辑
  4. """
  5. # from hub import methods, Global
  6. import struct
  7. import asyncio
  8. import time
  9. # from concurrent.futures import ThreadPoolExecutor
  10. import sys
  11. import importlib
  12. # --- for linux
  13. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
  14. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
  15. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  16. methods = importlib.import_module(f"xlib")
  17. class SRIConnection(asyncio.Protocol):
  18. """"""
  19. head_sequence = '<hh' # 字节序规则
  20. # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  21. # executor = ThreadPoolExecutor()
  22. clients = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  23. def connection_made(self, client):
  24. self.client = client
  25. self.peername = client.get_extra_info('peername')
  26. print(f"Connection from {self.peername}")
  27. # --- 自定义参数
  28. self.head_sequence = '<hh' # 字节序规则
  29. self.head_size = struct.calcsize(self.head_sequence)
  30. self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}" # 12700130000
  31. # --- fill ---
  32. self.clients[self.connection_id] = client, methods.now_ts(), 'type=vehicle'
  33. def data_received(self, data):
  34. asyncio.create_task(self.handle_data(data))
  35. async def handle_data(self, data):
  36. loop = asyncio.get_running_loop()
  37. await loop.run_in_executor(None, self.process_data, data)
  38. def process_data(self, data):
  39. head_data = data[:self.head_size]
  40. body_data = data[self.head_size:]
  41. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  42. methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}")
  43. if command_id == protobuf.CS_KeepAlive: # 2008
  44. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None")
  45. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None")
  46. def connection_lost(self, exc):
  47. print(f"Connection closed by {self.client_info}")
  48. if self.client_info in CustomProtocol.clients:
  49. del CustomProtocol.clients[self.client_info]
  50. self.on_disconnect()
  51. # def connection_lost(self, exc):
  52. # print(f"Connection closed by {self.connection_id}")
  53. # if self.connection_id in SRIConnection.clients:
  54. # del SRIConnection.clients[self.connection_id]
  55. @classmethod
  56. async def check_clients(cls):
  57. count = 0
  58. while True:
  59. count += 1
  60. print(f"#count: {count}", flush=True)
  61. # for connection_id, (client, update_at, client_type) in cls.clients.items():
  62. for connection_id in list(cls.clients.keys()):
  63. client, update_at, client_type = cls.clients.get(connection_id)
  64. print(f"#connection_id: {connection_id}, #update_at: {update_at}, #client_type: {client_type}",
  65. flush=True)
  66. # 创建并发送消息
  67. object = protobuf.SCAdd()
  68. object.ret = True
  69. object.uid = 112233
  70. object.name = 'periodic_message'
  71. re_command_id = protobuf.SC_Add # 4007
  72. re_body_length = object.ByteSize()
  73. re_head_data = struct.pack(cls.head_sequence, re_command_id, re_body_length)
  74. re_body_data = object.SerializeToString()
  75. re_send_data = re_head_data + re_body_data
  76. methods.debug_log(f"{connection_id} | SRIConnection.periodic", f"re_command_id: {re_command_id}")
  77. methods.debug_log(f"{connection_id} | SRIConnection.periodic", f"re_send_data: {re_send_data}")
  78. client.write(re_send_data)
  79. await asyncio.sleep(3) # 发送消息的间隔时间
  80. # @classmethod
  81. # async def check_clients(cls):
  82. # while True:
  83. # print("Checking client connections:", flush=True)
  84. # for client_info in list(cls.clients.keys()):
  85. # methods.debug_log(f"SRIConnection.221", f"Client {client_info} is connected")
  86. # print(f"Client {client_info} is connected", flush=True)
  87. # await asyncio.sleep(3)
  88. @classmethod
  89. async def run(cls):
  90. loop = asyncio.get_running_loop()
  91. server = await loop.create_server(
  92. lambda: SRIConnection(),
  93. '0.0.0.0', 20917
  94. )
  95. # asyncio.create_task(SRIConnection.check_clients())
  96. # await loop.create_task(SRIConnection.check_clients())
  97. async with server:
  98. print("Server listening on 0.0.0.0:20917", flush=True)
  99. methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917")
  100. await loop.create_task(SRIConnection.check_clients())
  101. await server.serve_forever()
  102. # await send_task # 确保任务在 server 关闭后也继续执行
  103. if __name__ == '__main__':
  104. asyncio.run(SRIConnection.run())
  105. # asyncio.run(main())