from hub import methods, Global import asyncio import threading import traceback class LineManage(object): """""" line_dict = {} # {: , } @classmethod def run_forever(cls): tasks = [cls.check_loop()] _loop = asyncio.new_event_loop() asyncio.set_event_loop(_loop) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) @classmethod def run_background(cls, is_back_run=True): t1 = threading.Thread(target=cls.run_forever) t1.start() @classmethod async def check_loop(cls): # --- define --- # last_send_id = str() while True: try: # --- debug --- # methods.debug_log(f"LineManage.check_loop44", f"#run at {methods.now_string()} " # f"| {len(cls.line_dict.values())}") # await asyncio.sleep(3) # await asyncio.sleep(0.5) # --- get send_data --- # """ # send_data = { # send_id: 数据id # send_list: 数据列表 # } # """ # send_data = Global.rdb.get_one(key='send_data') # send_data = db0.get_one(key='send_data') # --- check --- # if not send_data: # continue # --- check --- # send_id = send_data.get('send_id') # if not send_id: # continue # --- check --- # if send_id == last_send_id: # continue # --- check --- # send_list = send_data.get('send_list') # if send_list is None or len(send_list) == 0: # continue # --- debug --- # await asyncio.sleep(3) # await asyncio.sleep(0.5) # methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} " # f"| send count is {len(send_list)} " # f"| online count is {len(cls.line_dict.values())}") # --- define --- now_ts = methods.now_ts() # --- check --- for username in list(cls.line_dict.keys()): try: # --- debug --- _ws, last_live_at, _id = cls.line_dict.get(username) methods.debug_log(f"LineManage.check_loop87", f"#username: {username}" f" | #last_live_at: {last_live_at}" f" | #now_ts: {now_ts}") # --- check 180s --- if now_ts - last_live_at >= 3 * 60: cls.line_dict.pop(username) # --- check --- # if not cls.check_line_is_live(line_id): # methods.debug_log(f"LineManage", f"m-56: websocket link broken.") # cls.line_dict.pop(line_id) # continue # --- send --- # """ # send_list = [ # { # base_face_uuid: 底库人脸id # snap_face_image: 抓拍人脸 # base_face_image_path: 底库人脸路径 # face_similarity: 相似度 # } # ] # """ # for data in send_list: # # # --- check --- # if data.get('snap_face_image') is None: # continue # # # --- define --- # """ # send_dict = { # input_face_b64: 抓拍人脸图像 # face_uuid: 人脸id # face_name: 人脸名称 # known_face_b64: 底库人脸图像 # face_similarity: 相似度 # face_type_name_list: 人员类型 # } # """ # send_dict = dict( # input_face_b64=cls.image_to_b64(data.get('snap_face_image')), # # input_face_b64=str(), # known_face_b64=str(), # face_uuid=str(), # face_name=str(), # face_similarity=data.get('face_similarity'), # face_type_name_list=list(), # ) # # # --- fill input_face_b64 --- # # snap_face_image_path = data.get('snap_face_image_path') # # if snap_face_image_path and methods.is_file(snap_face_image_path): # # frame = cv2.imread(snap_face_image_path) # # if frame is not None: # # _, image = cv2.imencode('.jpg', frame) # # base64_data = base64.b64encode(image) # byte to b64 byte # # s = base64_data.decode() # byte to str # # send_dict['input_face_b64'] = f'data:image/jpeg;base64,{s}' # # # --- fill known_face_b64 --- # base_face_image_path = data.get('base_face_image_path') # if base_face_image_path and methods.is_file(base_face_image_path): # frame = cv2.imread(base_face_image_path) # if frame is not None: # _, image = cv2.imencode('.jpg', frame) # base64_data = base64.b64encode(image) # byte to b64 byte # s = base64_data.decode() # byte to str # send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}' # # # --- fill face_uuid and face_name --- # """ # Face: 陌生人脸表 # Face.face_name: 人脸名称 # """ # face_uuid = data.get('base_face_uuid') # if face_uuid: # send_dict['face_uuid'] = face_uuid # face = Global.mdb.get_one_by_id('Face', face_uuid) # if face and face.get('face_name'): # send_dict['face_name'] = face.get('face_name') # # # --- fill face_type_name_list --- # face_type_uuid_list = face.get('face_type_uuid_list') # if face_type_uuid_list: # send_dict['face_type_name_list'] = [face_type_name_dict.get(i) # for i in face_type_uuid_list # if face_type_name_dict.get(i)] # # # --- send --- # # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}") # line = cls.line_dict.get(line_id) # send_json = methods.json_dumps(send_dict) # await line.send_text(send_json) # # methods.debug_log(f"LineManage", # # f"m-161: end at {datetime.datetime.now().strftime('%H:%M:%S.%f')}") # # await asyncio.sleep(0.1) except Exception as exception: # --- check --- if not cls.check_line_is_live(username): cls.line_dict.pop(username) if exception.__class__.__name__ == 'RuntimeError': methods.debug_log(f"LineManage.check_loop194", f"d2: {cls.get_line_state()}") else: methods.debug_log("LineManage.check_loop194", f"#e: {exception.__class__.__name__}") methods.debug_log("LineManage.check_loop194", f"#t: {traceback.format_exc()}") # --- debug --- methods.debug_log(f"LineManage.check_loop198", f"wait 1 minutes check again") await asyncio.sleep(60) # await asyncio.sleep(60) # await asyncio.sleep(60) except Exception as exception: methods.debug_log(f"LineManage.check_loop208", f"#e: {exception.__class__.__name__}") methods.debug_log(f"LineManage.check_loop208", f"#t: {traceback.format_exc()}") methods.debug_log(f"LineManage.check_loop208", f"wait 1 minutes try again!") await asyncio.sleep(60) @classmethod def get_line_total(cls): count = 0 for k, v in cls.line_dict.items(): count += 1 return count @classmethod def check_line_is_live(cls, line_id): d1 = { 0: 'CONNECTING', 1: 'CONNECTED', 2: 'DISCONNECTED', } _ws, _, _ = cls.line_dict.get(line_id) if _ws and d1.get(_ws.client_state.value) != 'DISCONNECTED': return True else: return False @classmethod def get_line_state(cls): d1 = { 0: 'CONNECTING', 1: 'CONNECTED', 2: 'DISCONNECTED', } d2 = dict() # {: } for line_id, line in cls.line_dict.items(): state = d1.get(line.client_state.value) _id = line_id[-6:] d2[_id] = state return d2 # @staticmethod # def image_to_b64(image): # frame = numpy_method.to_array(image) # list to numpy array # _, image = cv2.imencode('.jpg', frame) # base64_data = base64.b64encode(image) # s = base64_data.decode() # return f'data:image/jpeg;base64,{s}'