Connection_b1.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. # from hub import methods, Global
  2. import struct
  3. import asyncio
  4. import time
  5. # from concurrent.futures import ThreadPoolExecutor
  6. import sys
  7. import importlib
  8. # --- for linux
  9. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
  10. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
  11. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  12. methods = importlib.import_module(f"xlib")
  13. class SRIConnection(asyncio.Protocol):
  14. """"""
  15. # head_sequence = '<hh' # 字节序规则
  16. # all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  17. # executor = ThreadPoolExecutor()
  18. clients = {}
  19. def connection_made(self, client):
  20. self.client = client
  21. self.peername = client.get_extra_info('peername')
  22. print(f"Connection from {self.peername}")
  23. # --- 自定义参数
  24. self.head_sequence = '<hh' # 字节序规则
  25. self.head_size = struct.calcsize(self.head_sequence)
  26. self.connection_id = f"{self.peername[0].replace('.', '')}{self.peername[1]}"
  27. def data_received(self, data):
  28. asyncio.create_task(self.handle_data(data))
  29. async def handle_data(self, data):
  30. loop = asyncio.get_running_loop()
  31. # 全局的 ThreadPoolExecutor,由 asyncio 库自动创建和管理。它的线程池大小默认是机器的CPU核心数乘以5
  32. await loop.run_in_executor(None, self.process_data, data)
  33. # 自定义的 ThreadPoolExecutor
  34. # await loop.run_in_executor(SRIConnection.executor, self.process_data, data)
  35. # result = await asyncio.get_running_loop().run_in_executor(None, self.process_data, data)
  36. # if result:
  37. # self.client.write(result)
  38. def process_data(self, data):
  39. head_data = data[:self.head_size]
  40. body_data = data[self.head_size:]
  41. command_id, body_length = struct.unpack(self.head_sequence, head_data)
  42. methods.debug_log('SRIConnection.37', f"Received values: {command_id}, {body_length}")
  43. if command_id == protobuf.CS_KeepAlive: # 2008
  44. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: None")
  45. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: None")
  46. elif command_id == protobuf.CS_Sign: # 2000
  47. # --- 接收请求数据
  48. object = protobuf.CSSign()
  49. object.ParseFromString(body_data)
  50. methods.debug_log(f"{self.connection_id} | SRIConnection.100",
  51. f"Received Response: account={object.account}, password={object.password}")
  52. # --- 发送返回数据
  53. object = protobuf.SCSign()
  54. object.ret = True
  55. object.uid = 112233
  56. object.cid = 223344
  57. object.name = 'aabbcc'
  58. re_command_id = protobuf.SC_Sign # 4000
  59. re_body_length = object.ByteSize()
  60. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  61. re_body_data = object.SerializeToString()
  62. re_send_data = re_head_data + re_body_data
  63. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  64. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  65. self.client.write(re_send_data)
  66. # --- test ---
  67. # for _ in range(10):
  68. # for i in range(3)[::-1]:
  69. # methods.debug_log(f"{self.connection_id} | SRIConnection.92", f"sleep {i}s!")
  70. # time.sleep(1)
  71. # methods.debug_log(f"{self.connection_id} | SRIConnection.94", f"send re_send_data")
  72. # self.client.write(re_send_data)
  73. elif command_id == protobuf.CS_Add: # 2009
  74. # --- 接收请求数据
  75. object = protobuf.CSAdd()
  76. object.ParseFromString(body_data)
  77. methods.debug_log(f"{self.connection_id} | SRIConnection.100",
  78. f"Received Response: serial={object.serial}, name={object.name}")
  79. # --- 发送返回数据
  80. o1 = protobuf.Robot()
  81. o1.rid = 112233
  82. o1.name = "aabbcc"
  83. o1.type = 123
  84. o1.state = protobuf.Robot.Online
  85. o2 = protobuf.SCAddRobot()
  86. o2.robot.CopyFrom(o1)
  87. re_command_id = protobuf.SC_NotifyAdd # 4016
  88. re_body_length = o2.ByteSize()
  89. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  90. re_body_data = o2.SerializeToString()
  91. re_send_data = re_head_data + re_body_data
  92. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  93. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  94. self.client.write(re_send_data)
  95. # --- 发送返回数据
  96. o1 = protobuf.SCAdd()
  97. o1.ret = True
  98. o1.uid = 112233
  99. o1.name = 'aabbcc'
  100. re_command_id = protobuf.SC_Add # 4007
  101. re_body_length = o1.ByteSize()
  102. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  103. re_body_data = o1.SerializeToString()
  104. re_send_data = re_head_data + re_body_data
  105. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  106. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  107. self.client.write(re_send_data)
  108. elif command_id == protobuf.CS_Offer: # 2004
  109. # --- 接收请求数据
  110. object = protobuf.Offer()
  111. object.ParseFromString(body_data)
  112. print(f"Received Response: peer={object.peer}")
  113. # --- 发送返回数据
  114. o1 = protobuf.Offer()
  115. o1.index = object.index
  116. o1.peer = object.peer
  117. o1.type = object.type
  118. o1.sdp = object.sdp
  119. re_command_id = protobuf.SC_NotifyOffer # 4012
  120. re_body_length = o1.ByteSize()
  121. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  122. re_body_data = o1.SerializeToString()
  123. re_send_data = re_head_data + re_body_data
  124. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  125. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  126. self.client.write(re_send_data)
  127. elif command_id == protobuf.CS_Answer: # 2005
  128. # --- 接收请求数据
  129. object = protobuf.Answer()
  130. object.ParseFromString(body_data)
  131. print(f"Received Response: peer={object.peer}")
  132. # --- 发送返回数据
  133. o1 = protobuf.Offer()
  134. o1.index = object.index
  135. o1.peer = object.peer
  136. o1.type = object.type
  137. o1.sdp = object.sdp
  138. re_command_id = protobuf.SC_NotifyAnswer # 4011
  139. re_body_length = o1.ByteSize()
  140. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  141. re_body_data = o1.SerializeToString()
  142. re_send_data = re_head_data + re_body_data
  143. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  144. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  145. self.client.write(re_send_data)
  146. elif command_id == protobuf.CS_Candidate: # 2006
  147. # --- 接收请求数据
  148. object = protobuf.Candidate()
  149. object.ParseFromString(body_data)
  150. print(f"Received Response: peer={object.peer}")
  151. # --- 发送返回数据
  152. o1 = protobuf.Offer()
  153. o1.index = object.index
  154. o1.peer = object.peer
  155. o1.type = object.type
  156. o1.candidate = object.candidate
  157. o1.sdpMLineIndex = object.sdpMLineIndex
  158. o1.sdpMid = object.sdpMid
  159. o1.egotype = object.egotype
  160. re_command_id = protobuf.SC_NotifyCandidate # 4013
  161. re_body_length = o1.ByteSize()
  162. re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
  163. re_body_data = o1.SerializeToString()
  164. re_send_data = re_head_data + re_body_data
  165. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_command_id: {re_command_id}")
  166. methods.debug_log(f"{self.connection_id} | SRIConnection.113", f"re_send_data: {re_send_data}")
  167. self.client.write(re_send_data)
  168. def connection_lost(self, exc):
  169. print(f"SRIConnection closed by {self.peername}")
  170. # def connection_lost(self, exc):
  171. # print(f"Connection closed by {self.connection_id}")
  172. # if self.connection_id in SRIConnection.clients:
  173. # del SRIConnection.clients[self.connection_id]
  174. @classmethod
  175. async def run(cls):
  176. loop = asyncio.get_running_loop()
  177. server = await loop.create_server(
  178. lambda: SRIConnection(),
  179. '0.0.0.0', 20917
  180. )
  181. async with server:
  182. print(f"Server listening on 0.0.0.0:20917")
  183. methods.debug_log(f"SRIConnection.221", f"Server listening on 0.0.0.0:20917")
  184. await server.serve_forever()
  185. # async def main():
  186. # loop = asyncio.get_running_loop()
  187. # server = await loop.create_server(
  188. # lambda: SRIConnection(),
  189. # '0.0.0.0', 20917
  190. # )
  191. #
  192. # async with server:
  193. # print("Server listening on 0.0.0.0:20917")
  194. # await server.serve_forever()
  195. if __name__ == '__main__':
  196. asyncio.run(SRIConnection.run())
  197. # asyncio.run(main())