import asyncio
import struct

class CustomProtocol(asyncio.Protocol):
    clients = {}

    def connection_made(self, client):
        self.client = client
        self.client_info = client.get_extra_info('peername')
        print(f"Connection from {self.client_info}", flush=True)
        CustomProtocol.clients[self.client_info] = client
        self.on_connect()

    def data_received(self, data):
        asyncio.create_task(self.handle_data(data))

    def connection_lost(self, exc):
        print(f"Connection closed by {self.client_info}", flush=True)
        if self.client_info in CustomProtocol.clients:
            del CustomProtocol.clients[self.client_info]
        self.on_disconnect()

    def on_connect(self):
        print(f"Connected to {self.client_info}", flush=True)

    async def handle_data(self, data):
        # Simulating a blocking operation with asyncio.run_in_executor
        result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data)
        self.client.write(result)

    def process_data(self, data):
        # Simulate a CPU-bound task
        values = struct.unpack('!hh', data)
        processed_values = (values[0] + 1, values[1] + 1)
        return struct.pack('!hh', *processed_values)

    def on_disconnect(self):
        print(f"Disconnected from {self.client_info}", flush=True)

    @classmethod
    async def broadcast(cls, message):
        data = struct.pack('!hh', *message)
        for client in cls.clients.values():
            client.write(data)
        await asyncio.gather(*(client.drain() for client in cls.clients.values()))

async def main():
    loop = asyncio.get_running_loop()
    server = await loop.create_server(
        lambda: CustomProtocol(),
        '0.0.0.0', 20917
    )

    async with server:
        print("Server listening on 0.0.0.0:20917", flush=True)
        await server.serve_forever()

if __name__ == "__main__":
    asyncio.run(main())