Connection_a1.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # from hub import methods, Global
  2. import threading
  3. import socket
  4. import struct
  5. import time
  6. # import cv2
  7. # import base64
  8. # import asyncio
  9. import sys
  10. import importlib
  11. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
  12. protobuf = importlib.import_module(f"protobuf.protocol_pb2")
  13. sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
  14. methods = importlib.import_module(f"xlib")
  15. class Connection(object):
  16. """"""
  17. tcp_server = None
  18. tcp_client = None
  19. all_connection_dict = {} # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
  20. @classmethod
  21. def get_method_by_command_id(cls, command_id):
  22. """获取方法名"""
  23. fn_dict = {
  24. protobuf.CS_Sign: cls.f1, # 建立webrtc邀请
  25. }
  26. return fn_dict.get(command_id)
  27. @classmethod
  28. def f1(cls):
  29. pass
  30. # @classmethod
  31. # def run_forever(cls):
  32. # """
  33. # 调用协程方法
  34. # """
  35. # tasks = [cls.check_send()]
  36. # _loop = asyncio.new_event_loop()
  37. # asyncio.set_event_loop(_loop)
  38. # loop = asyncio.get_event_loop()
  39. # loop.run_until_complete(asyncio.wait(tasks))
  40. # @classmethod
  41. # async def check_send(cls):
  42. #
  43. # # --- define ---
  44. # last_send_id = str()
  45. #
  46. # while True:
  47. #
  48. # try:
  49. # # --- fill face_type_name_dict ---
  50. # """
  51. # face_type_name_dict = {<type_uuid>: <name>}
  52. # """
  53. # face_type_name_dict = dict()
  54. # for item in Global.mdb.get_all('FaceType'):
  55. # uuid = str(item.get('_id'))
  56. # face_type_name_dict[uuid] = item.get('name')
  57. #
  58. # # --- debug ---
  59. # # methods.debug_log(f"LineManage", f"m-21: run at {methods.now_string()} "
  60. # # f"| {len(cls.line_dict.values())}")
  61. # # await asyncio.sleep(3)
  62. # # await asyncio.sleep(0.5)
  63. #
  64. # # --- get send_data ---
  65. # """
  66. # send_data = {
  67. # send_id: 数据id
  68. # send_list: 数据列表
  69. # }
  70. # """
  71. # send_data = Global.rdb.get_one(key='send_data')
  72. # # send_data = db0.get_one(key='send_data')
  73. #
  74. # # --- check ---
  75. # if not send_data:
  76. # continue
  77. #
  78. # # --- check ---
  79. # send_id = send_data.get('send_id')
  80. # if not send_id:
  81. # continue
  82. #
  83. # # --- check ---
  84. # if send_id == last_send_id:
  85. # continue
  86. #
  87. # # --- check ---
  88. # send_list = send_data.get('send_list')
  89. # if send_list is None or len(send_list) == 0:
  90. # continue
  91. #
  92. # # --- debug ---
  93. # # await asyncio.sleep(3)
  94. # # await asyncio.sleep(0.5)
  95. # methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "
  96. # f"| send count is {len(send_list)} "
  97. # f"| online count is {len(cls.line_dict.values())}")
  98. #
  99. # # --- update ---
  100. # last_send_id = send_id
  101. #
  102. # # --- send ---
  103. # for line_id in list(cls.line_dict.keys()):
  104. #
  105. # try:
  106. #
  107. # # --- check ---
  108. # if not cls.check_line_is_live(line_id):
  109. # methods.debug_log(f"LineManage", f"m-56: websocket link broken.")
  110. # cls.line_dict.pop(line_id)
  111. # continue
  112. #
  113. # # --- send ---
  114. # """
  115. # send_list = [
  116. # {
  117. # base_face_uuid: 底库人脸id
  118. # snap_face_image: 抓拍人脸
  119. # base_face_image_path: 底库人脸路径
  120. # face_similarity: 相似度
  121. # }
  122. # ]
  123. # """
  124. # for data in send_list:
  125. #
  126. # # --- check ---
  127. # if data.get('snap_face_image') is None:
  128. # continue
  129. #
  130. # # --- define ---
  131. # """
  132. # send_dict = {
  133. # input_face_b64: 抓拍人脸图像
  134. # face_uuid: 人脸id
  135. # face_name: 人脸名称
  136. # known_face_b64: 底库人脸图像
  137. # face_similarity: 相似度
  138. # face_type_name_list: 人员类型
  139. # }
  140. # """
  141. # send_dict = dict(
  142. # input_face_b64=cls.image_to_b64(data.get('snap_face_image')),
  143. # # input_face_b64=str(),
  144. # known_face_b64=str(),
  145. # face_uuid=str(),
  146. # face_name=str(),
  147. # face_similarity=data.get('face_similarity'),
  148. # face_type_name_list=list(),
  149. # )
  150. #
  151. # # --- fill known_face_b64 ---
  152. # base_face_image_path = data.get('base_face_image_path')
  153. # if base_face_image_path and methods.is_file(base_face_image_path):
  154. # frame = cv2.imread(base_face_image_path)
  155. # if frame is not None:
  156. # _, image = cv2.imencode('.jpg', frame)
  157. # base64_data = base64.b64encode(image) # byte to b64 byte
  158. # s = base64_data.decode() # byte to str
  159. # send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'
  160. #
  161. # # --- fill face_uuid and face_name ---
  162. # """
  163. # Face: 陌生人脸表
  164. # Face.face_name: 人脸名称
  165. # """
  166. # face_uuid = data.get('base_face_uuid')
  167. # if face_uuid:
  168. # send_dict['face_uuid'] = face_uuid
  169. # face = Global.mdb.get_one_by_id('Face', face_uuid)
  170. # if face and face.get('face_name'):
  171. # send_dict['face_name'] = face.get('face_name')
  172. #
  173. # # --- fill face_type_name_list ---
  174. # face_type_uuid_list = face.get('face_type_uuid_list')
  175. # if face_type_uuid_list:
  176. # send_dict['face_type_name_list'] = [face_type_name_dict.get(i)
  177. # for i in face_type_uuid_list
  178. # if face_type_name_dict.get(i)]
  179. #
  180. # # --- send ---
  181. # # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")
  182. # line = cls.line_dict.get(line_id)
  183. # send_json = methods.json_dumps(send_dict)
  184. # await line.send_text(send_json)
  185. # # await asyncio.sleep(0.1)
  186. #
  187. # except Exception as exception:
  188. #
  189. # # --- check ---
  190. # if not cls.check_line_is_live(line_id):
  191. # cls.line_dict.pop(line_id)
  192. #
  193. # if exception.__class__.__name__ == 'RuntimeError':
  194. # methods.debug_log(f"LineManage", f"m-170: {cls.get_line_state()}")
  195. # else:
  196. # methods.debug_log('LineManage', f"m-172: exception | {exception}")
  197. # methods.debug_log('LineManage', f"m-172: traceback | {methods.trace_log()}")
  198. #
  199. # except Exception as exception:
  200. #
  201. # methods.debug_log('LineManage', f"m-179: exception | {exception}")
  202. # methods.debug_log('LineManage', f"m-179: traceback | {methods.trace_log()}")
  203. # methods.debug_log('LineManage', f"m-179: wait 1 minutes try again!")
  204. # await asyncio.sleep(60)
  205. # @classmethod
  206. # def get_line_total(cls):
  207. # count = 0
  208. # for k, v in cls.line_dict.items():
  209. # count += 1
  210. # return count
  211. # @classmethod
  212. # def check_line_is_live(cls, line_id):
  213. # d1 = {
  214. # 0: 'CONNECTING',
  215. # 1: 'CONNECTED',
  216. # 2: 'DISCONNECTED',
  217. # }
  218. # line = cls.line_dict.get(line_id)
  219. # if line and d1.get(line.client_state.value) != 'DISCONNECTED':
  220. # return True
  221. # else:
  222. # return False
  223. # @classmethod
  224. # def get_line_state(cls):
  225. # d1 = {
  226. # 0: 'CONNECTING',
  227. # 1: 'CONNECTED',
  228. # 2: 'DISCONNECTED',
  229. # }
  230. # d2 = dict() # {<line_id>: <state>}
  231. # for line_id, line in cls.line_dict.items():
  232. # state = d1.get(line.client_state.value)
  233. # _id = line_id[-6:]
  234. # d2[_id] = state
  235. # return d2
  236. # @staticmethod
  237. # def image_to_b64(image):
  238. # frame = numpy_method.to_array(image) # list to numpy array
  239. # _, image = cv2.imencode('.jpg', frame)
  240. # base64_data = base64.b64encode(image)
  241. # s = base64_data.decode()
  242. # return f'data:image/jpeg;base64,{s}'
  243. @classmethod
  244. def create_tcp_server(cls, host='0.0.0.0', port=20916):
  245. cls.tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  246. cls.tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  247. cls.tcp_server.bind((host, port))
  248. cls.tcp_server.listen(10) # 设置可处理的未处理连接请求的最大数量
  249. methods.debug_log('Connection.260', f"Server listening on {host}:{port}")
  250. @classmethod
  251. def start_listener(cls):
  252. cls.create_tcp_server()
  253. head_sequence = '<hh' # 字节序规则
  254. head_size = struct.calcsize(head_sequence)
  255. while True:
  256. try:
  257. # --- 等待连接 ---
  258. client, ipv4 = cls.tcp_server.accept()
  259. # cls.clients[ipv4] = client, 时间戳, 客户端类型
  260. methods.debug_log('Connection.260', f"Connected by {ipv4}")
  261. # --- 解析数据头
  262. head_bytestream = client.recv(head_size)
  263. if not head_bytestream:
  264. continue
  265. command_id, body_length = struct.unpack(head_sequence, head_bytestream)
  266. methods.debug_log('Connection.284', f"Received values: {command_id, body_length}")
  267. # --- debug ---
  268. # command_id, body_length = struct.unpack('<hh', head_bytestream)
  269. # methods.debug_log('Connection.1', f"Received values: {command_id, body_length}")
  270. # command_id, body_length = struct.unpack('<HH', head_bytestream)
  271. # methods.debug_log('Connection.2', f"Received values: {command_id, body_length}")
  272. # command_id, body_length = struct.unpack('>hh', head_bytestream)
  273. # methods.debug_log('Connection.5', f"Received values: {command_id, body_length}")
  274. # command_id, body_length = struct.unpack('>HH', head_bytestream)
  275. # methods.debug_log('Connection.6', f"Received values: {command_id, body_length}")
  276. # command_id, body_length = struct.unpack('@hh', head_bytestream)
  277. # methods.debug_log('Connection.9', f"Received values: {command_id, body_length}")
  278. # command_id, body_length = struct.unpack('@HH', head_bytestream)
  279. # methods.debug_log('Connection.10', f"Received values: {command_id, body_length}")
  280. # command_id, body_length = struct.unpack('=hh', head_bytestream)
  281. # methods.debug_log('Connection.13', f"Received values: {command_id, body_length}")
  282. # command_id, body_length = struct.unpack('=HH', head_bytestream)
  283. # methods.debug_log('Connection.14', f"Received values: {command_id, body_length}")
  284. # command_id, body_length = struct.unpack('<ii', head_bytestream)
  285. # methods.debug_log('Connection.3', f"Received values: {command_id, body_length}")
  286. # command_id, body_length = struct.unpack('<II', head_bytestream)
  287. # methods.debug_log('Connection.4', f"Received values: {command_id, body_length}")
  288. # command_id, body_length = struct.unpack('>ii', head_bytestream)
  289. # methods.debug_log('Connection.7', f"Received values: {command_id, body_length}")
  290. # command_id, body_length = struct.unpack('>II', head_bytestream)
  291. # methods.debug_log('Connection.8', f"Received values: {command_id, body_length}")
  292. # command_id, body_length = struct.unpack('@ii', head_bytestream)
  293. # methods.debug_log('Connection.11', f"Received values: {command_id, body_length}")
  294. # command_id, body_length = struct.unpack('@II', head_bytestream)
  295. # methods.debug_log('Connection.12', f"Received values: {command_id, body_length}")
  296. # command_id, body_length = struct.unpack('=ii', head_bytestream)
  297. # methods.debug_log('Connection.15', f"Received values: {command_id, body_length}")
  298. # command_id, body_length = struct.unpack('=II', head_bytestream)
  299. # methods.debug_log('Connection.16', f"Received values: {command_id, body_length}")
  300. # --- 解析数据体
  301. if command_id == protobuf.CS_KeepAlive:
  302. continue
  303. elif command_id == protobuf.CS_Sign:
  304. # --- 接收请求数据
  305. body_bytestream = client.recv(body_length)
  306. object = protobuf.CSSign()
  307. object.ParseFromString(body_bytestream)
  308. print(f"Received Response: account={object.account}, password={object.password}")
  309. # --- 发送返回数据
  310. object = protobuf.SCSign()
  311. object.ret = True
  312. object.uid = 112233
  313. object.cid = 223344
  314. object.name = 'aabbcc'
  315. command_id = protobuf.SC_Sign
  316. body_length = object.ByteSize()
  317. head_bytestream = struct.pack(head_sequence, command_id, body_length)
  318. body_bytestream = object.SerializeToString()
  319. send_bytestream = head_bytestream + body_bytestream
  320. client.sendall(send_bytestream)
  321. print(f"Sent data: {send_bytestream}")
  322. # --- test ---
  323. # for _ in range(10):
  324. # for i in range(3)[::-1]:
  325. # methods.debug_log('SRIConnection.58', f"sleep {i}s! | [{ipv4}]")
  326. # time.sleep(1)
  327. # methods.debug_log('SRIConnection.58', f"send re_send_data | [{ipv4}]")
  328. # client.sendall(send_bytestream)
  329. except Exception as exception:
  330. methods.debug_log('Connection.287', f"#exception: {exception}")
  331. methods.debug_log('Connection.287', f"#traceback: {methods.trace_log()}")
  332. finally:
  333. client.close()
  334. @classmethod
  335. def run(cls, background_is=True):
  336. thread_list = [
  337. threading.Thread(target=cls.start_listener), # 人脸检测 deepstream facenet
  338. ]
  339. for thread in thread_list:
  340. thread.setDaemon(True)
  341. thread.start()
  342. if background_is:
  343. return
  344. for thread in thread_list:
  345. thread.join()
  346. if __name__ == '__main__':
  347. print(protobuf.CS_Offer)
  348. Connection.run(background_is=False)