import asyncio import struct class CustomProtocol(asyncio.Protocol): clients = {} def connection_made(self, client): self.client = client self.client_info = client.get_extra_info('peername') print(f"Connection from {self.client_info}", flush=True) CustomProtocol.clients[self.client_info] = client self.on_connect() def data_received(self, data): asyncio.create_task(self.handle_data(data)) def connection_lost(self, exc): print(f"Connection closed by {self.client_info}", flush=True) if self.client_info in CustomProtocol.clients: del CustomProtocol.clients[self.client_info] self.on_disconnect() def on_connect(self): print(f"Connected to {self.client_info}", flush=True) async def handle_data(self, data): # Simulating a blocking operation with asyncio.run_in_executor result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data) self.client.write(result) def process_data(self, data): # Simulate a CPU-bound task values = struct.unpack('!hh', data) processed_values = (values[0] + 1, values[1] + 1) return struct.pack('!hh', *processed_values) def on_disconnect(self): print(f"Disconnected from {self.client_info}", flush=True) @classmethod async def broadcast(cls, message): data = struct.pack('!hh', *message) for client in cls.clients.values(): client.write(data) await asyncio.gather(*(client.drain() for client in cls.clients.values())) async def main(): loop = asyncio.get_running_loop() server = await loop.create_server( lambda: CustomProtocol(), '0.0.0.0', 20917 ) async with server: print("Server listening on 0.0.0.0:20917", flush=True) await server.serve_forever() if __name__ == "__main__": asyncio.run(main())