1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import asyncio
- import struct
- class CustomProtocol(asyncio.Protocol):
- clients = {}
- def connection_made(self, client):
- self.client = client
- self.client_info = client.get_extra_info('peername')
- print(f"Connection from {self.client_info}", flush=True)
- CustomProtocol.clients[self.client_info] = client
- self.on_connect()
- def data_received(self, data):
- asyncio.create_task(self.handle_data(data))
- def connection_lost(self, exc):
- print(f"Connection closed by {self.client_info}", flush=True)
- if self.client_info in CustomProtocol.clients:
- del CustomProtocol.clients[self.client_info]
- self.on_disconnect()
- def on_connect(self):
- print(f"Connected to {self.client_info}", flush=True)
- async def handle_data(self, data):
- # Simulating a blocking operation with asyncio.run_in_executor
- result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data)
- self.client.write(result)
- def process_data(self, data):
- # Simulate a CPU-bound task
- values = struct.unpack('!hh', data)
- processed_values = (values[0] + 1, values[1] + 1)
- return struct.pack('!hh', *processed_values)
- def on_disconnect(self):
- print(f"Disconnected from {self.client_info}", flush=True)
- @classmethod
- async def broadcast(cls, message):
- data = struct.pack('!hh', *message)
- for client in cls.clients.values():
- client.write(data)
- await asyncio.gather(*(client.drain() for client in cls.clients.values()))
- async def main():
- loop = asyncio.get_running_loop()
- server = await loop.create_server(
- lambda: CustomProtocol(),
- '0.0.0.0', 20917
- )
- async with server:
- print("Server listening on 0.0.0.0:20917", flush=True)
- await server.serve_forever()
- if __name__ == "__main__":
- asyncio.run(main())
|