xudp.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """
  2. python3 /home/server/repositories/repositories/sri-project.demo-py/3rdparty/xserver/xudp.py
  3. """
  4. import asyncio
  5. import struct
  6. # from concurrent.futures import ThreadPoolExecutor
  7. class CustomProtocol:
  8. clients = {}
  9. # executor = ThreadPoolExecutor()
  10. def connection_made(self, transport):
  11. self.transport = transport
  12. self.peername = None # UDP没有真正的连接,因此peername为空
  13. print(f"UDP Server started", flush=True)
  14. def datagram_received(self, data, addr):
  15. asyncio.create_task(self.handle_data(data, addr))
  16. def error_received(self, exc):
  17. print(f"Error received: {exc}", flush=True)
  18. async def handle_data(self, data, addr):
  19. if len(data) < 4:
  20. return # Ensure complete data packet is received
  21. loop = asyncio.get_running_loop()
  22. values = await loop.run_in_executor(None, self.process_data, data)
  23. # values = await loop.run_in_executor(CustomProtocol.executor, self.process_data, data)
  24. print(f"Received values: {values} from {addr}", flush=True)
  25. response = struct.pack('!hh', values[0] + 1, values[1] + 1)
  26. self.transport.sendto(response, addr)
  27. @staticmethod
  28. def process_data(data):
  29. values = struct.unpack('!hh', data)
  30. print(f"Processing data: {values}", flush=True)
  31. # Simulate some processing
  32. return (values[0] + 1, values[1] + 1)
  33. async def main():
  34. loop = asyncio.get_running_loop()
  35. transport, protocol = await loop.create_datagram_endpoint(
  36. lambda: CustomProtocol(),
  37. local_addr=('127.0.0.1', 20917)
  38. )
  39. print("UDP Server listening on 127.0.0.1:20917", flush=True)
  40. try:
  41. await asyncio.Future() # Keep running indefinitely
  42. finally:
  43. transport.close() # Close the transport
  44. if __name__ == "__main__":
  45. asyncio.run(main())