line_manage.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. """
  2. websocket发数据
  3. """
  4. from hub import methods, Global, numpy_method
  5. import cv2
  6. import base64
  7. import asyncio
  8. import threading
  9. class LineManage(object):
  10. """"""
  11. line_dict = {} # {<line_id>: <ws>} | line_id: websocket连接id | ws: websocket链接对象
  12. @classmethod
  13. def run_forever(cls):
  14. """
  15. 调用协程方法
  16. """
  17. tasks = [cls.check_send()]
  18. _loop = asyncio.new_event_loop()
  19. asyncio.set_event_loop(_loop)
  20. loop = asyncio.get_event_loop()
  21. loop.run_until_complete(asyncio.wait(tasks))
  22. @classmethod
  23. def run_background(cls, is_back_run=True):
  24. """
  25. 后台运行
  26. """
  27. t1 = threading.Thread(target=cls.run_forever)
  28. t1.start()
  29. @classmethod
  30. async def check_send(cls):
  31. # --- define ---
  32. last_send_id = str()
  33. while True:
  34. try:
  35. # --- fill face_type_name_dict ---
  36. """
  37. face_type_name_dict = {<type_uuid>: <name>}
  38. """
  39. face_type_name_dict = dict()
  40. for item in Global.mdb.get_all('FaceType'):
  41. uuid = str(item.get('_id'))
  42. face_type_name_dict[uuid] = item.get('name')
  43. # --- debug ---
  44. # methods.debug_log(f"LineManage", f"m-21: run at {methods.now_string()} "
  45. # f"| {len(cls.line_dict.values())}")
  46. # await asyncio.sleep(3)
  47. # await asyncio.sleep(0.5)
  48. # --- get send_data ---
  49. """
  50. send_data = {
  51. send_id: 数据id
  52. send_list: 数据列表
  53. }
  54. """
  55. send_data = Global.rdb.get_one(key='send_data')
  56. # send_data = db0.get_one(key='send_data')
  57. # --- check ---
  58. if not send_data:
  59. continue
  60. # --- check ---
  61. send_id = send_data.get('send_id')
  62. if not send_id:
  63. continue
  64. # --- check ---
  65. if send_id == last_send_id:
  66. continue
  67. # --- check ---
  68. send_list = send_data.get('send_list')
  69. if send_list is None or len(send_list) == 0:
  70. continue
  71. # --- debug ---
  72. # await asyncio.sleep(3)
  73. # await asyncio.sleep(0.5)
  74. methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "
  75. f"| send count is {len(send_list)} "
  76. f"| online count is {len(cls.line_dict.values())}")
  77. # --- update ---
  78. last_send_id = send_id
  79. # --- send ---
  80. for line_id in list(cls.line_dict.keys()):
  81. try:
  82. # --- check ---
  83. if not cls.check_line_is_live(line_id):
  84. methods.debug_log(f"LineManage", f"m-56: websocket link broken.")
  85. cls.line_dict.pop(line_id)
  86. continue
  87. # --- send ---
  88. """
  89. send_list = [
  90. {
  91. base_face_uuid: 底库人脸id
  92. snap_face_image: 抓拍人脸
  93. base_face_image_path: 底库人脸路径
  94. face_similarity: 相似度
  95. }
  96. ]
  97. """
  98. for data in send_list:
  99. # --- check ---
  100. if data.get('snap_face_image') is None:
  101. continue
  102. # --- define ---
  103. """
  104. send_dict = {
  105. input_face_b64: 抓拍人脸图像
  106. face_uuid: 人脸id
  107. face_name: 人脸名称
  108. known_face_b64: 底库人脸图像
  109. face_similarity: 相似度
  110. face_type_name_list: 人员类型
  111. }
  112. """
  113. send_dict = dict(
  114. input_face_b64=cls.image_to_b64(data.get('snap_face_image')),
  115. # input_face_b64=str(),
  116. known_face_b64=str(),
  117. face_uuid=str(),
  118. face_name=str(),
  119. face_similarity=data.get('face_similarity'),
  120. face_type_name_list=list(),
  121. )
  122. # --- fill input_face_b64 ---
  123. # snap_face_image_path = data.get('snap_face_image_path')
  124. # if snap_face_image_path and methods.is_file(snap_face_image_path):
  125. # frame = cv2.imread(snap_face_image_path)
  126. # if frame is not None:
  127. # _, image = cv2.imencode('.jpg', frame)
  128. # base64_data = base64.b64encode(image) # byte to b64 byte
  129. # s = base64_data.decode() # byte to str
  130. # send_dict['input_face_b64'] = f'data:image/jpeg;base64,{s}'
  131. # --- fill known_face_b64 ---
  132. base_face_image_path = data.get('base_face_image_path')
  133. if base_face_image_path and methods.is_file(base_face_image_path):
  134. frame = cv2.imread(base_face_image_path)
  135. if frame is not None:
  136. _, image = cv2.imencode('.jpg', frame)
  137. base64_data = base64.b64encode(image) # byte to b64 byte
  138. s = base64_data.decode() # byte to str
  139. send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'
  140. # --- fill face_uuid and face_name ---
  141. """
  142. Face: 陌生人脸表
  143. Face.face_name: 人脸名称
  144. """
  145. face_uuid = data.get('base_face_uuid')
  146. if face_uuid:
  147. send_dict['face_uuid'] = face_uuid
  148. face = Global.mdb.get_one_by_id('Face', face_uuid)
  149. if face and face.get('face_name'):
  150. send_dict['face_name'] = face.get('face_name')
  151. # --- fill face_type_name_list ---
  152. face_type_uuid_list = face.get('face_type_uuid_list')
  153. if face_type_uuid_list:
  154. send_dict['face_type_name_list'] = [face_type_name_dict.get(i)
  155. for i in face_type_uuid_list
  156. if face_type_name_dict.get(i)]
  157. # --- send ---
  158. # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")
  159. line = cls.line_dict.get(line_id)
  160. send_json = methods.json_dumps(send_dict)
  161. await line.send_text(send_json)
  162. # await asyncio.sleep(0.1)
  163. except Exception as exception:
  164. # --- check ---
  165. if not cls.check_line_is_live(line_id):
  166. cls.line_dict.pop(line_id)
  167. if exception.__class__.__name__ == 'RuntimeError':
  168. methods.debug_log(f"LineManage", f"m-170: {cls.get_line_state()}")
  169. else:
  170. methods.debug_log('LineManage', f"m-172: exception | {exception}")
  171. methods.debug_log('LineManage', f"m-172: traceback | {methods.trace_log()}")
  172. except Exception as exception:
  173. methods.debug_log('LineManage', f"m-179: exception | {exception}")
  174. methods.debug_log('LineManage', f"m-179: traceback | {methods.trace_log()}")
  175. methods.debug_log('LineManage', f"m-179: wait 1 minutes try again!")
  176. await asyncio.sleep(60)
  177. @classmethod
  178. def get_line_total(cls):
  179. count = 0
  180. for k, v in cls.line_dict.items():
  181. count += 1
  182. return count
  183. @classmethod
  184. def check_line_is_live(cls, line_id):
  185. d1 = {
  186. 0: 'CONNECTING',
  187. 1: 'CONNECTED',
  188. 2: 'DISCONNECTED',
  189. }
  190. line = cls.line_dict.get(line_id)
  191. if line and d1.get(line.client_state.value) != 'DISCONNECTED':
  192. return True
  193. else:
  194. return False
  195. @classmethod
  196. def get_line_state(cls):
  197. d1 = {
  198. 0: 'CONNECTING',
  199. 1: 'CONNECTED',
  200. 2: 'DISCONNECTED',
  201. }
  202. d2 = dict() # {<line_id>: <state>}
  203. for line_id, line in cls.line_dict.items():
  204. state = d1.get(line.client_state.value)
  205. _id = line_id[-6:]
  206. d2[_id] = state
  207. return d2
  208. @staticmethod
  209. def image_to_b64(image):
  210. frame = numpy_method.to_array(image) # list to numpy array
  211. _, image = cv2.imencode('.jpg', frame)
  212. base64_data = base64.b64encode(image)
  213. s = base64_data.decode()
  214. return f'data:image/jpeg;base64,{s}'