| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 | from hub import methods, Globalimport asyncioimport threadingimport tracebackclass LineManage(object):    """"""    line_dict = {}  # {<line_id>: <ws>, <now_ts>}    @classmethod    def run_forever(cls):        tasks = [cls.check_loop()]        _loop = asyncio.new_event_loop()        asyncio.set_event_loop(_loop)        loop = asyncio.get_event_loop()        loop.run_until_complete(asyncio.wait(tasks))    @classmethod    def run_background(cls, is_back_run=True):        t1 = threading.Thread(target=cls.run_forever)        t1.start()    @classmethod    async def check_loop(cls):        # --- define ---        # last_send_id = str()        while True:            try:                # --- debug ---                # methods.debug_log(f"LineManage.check_loop44", f"#run at {methods.now_string()} "                #                                                f"| {len(cls.line_dict.values())}")                # await asyncio.sleep(3)                # await asyncio.sleep(0.5)                # --- get send_data ---                # """                # send_data = {                #     send_id: 数据id                #     send_list: 数据列表                # }                # """                # send_data = Global.rdb.get_one(key='send_data')                # send_data = db0.get_one(key='send_data')                # --- check ---                # if not send_data:                #     continue                # --- check ---                # send_id = send_data.get('send_id')                # if not send_id:                #     continue                # --- check ---                # if send_id == last_send_id:                #     continue                # --- check ---                # send_list = send_data.get('send_list')                # if send_list is None or len(send_list) == 0:                #     continue                # --- debug ---                # await asyncio.sleep(3)                # await asyncio.sleep(0.5)                # methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "                #                                  f"| send count is {len(send_list)} "                #                                  f"| online count is {len(cls.line_dict.values())}")                # --- define ---                now_ts = methods.now_ts()                # --- check ---                for username in list(cls.line_dict.keys()):                    try:                        # --- debug ---                        _ws, last_live_at, _id = cls.line_dict.get(username)                        methods.debug_log(f"LineManage.check_loop87", f"#username: {username}"                                                                      f" | #last_live_at: {last_live_at}"                                                                      f" | #now_ts: {now_ts}")                        # --- check 180s ---                        if now_ts - last_live_at >= 3 * 60:                            cls.line_dict.pop(username)                        # --- check ---                        # if not cls.check_line_is_live(line_id):                        #     methods.debug_log(f"LineManage", f"m-56: websocket link broken.")                        #     cls.line_dict.pop(line_id)                        #     continue                        # --- send ---                        # """                        # send_list = [                        #     {                        #         base_face_uuid: 底库人脸id                        #         snap_face_image: 抓拍人脸                        #         base_face_image_path: 底库人脸路径                        #         face_similarity: 相似度                        #     }                        # ]                        # """                        # for data in send_list:                        #                        #     # --- check ---                        #     if data.get('snap_face_image') is None:                        #         continue                        #                        #     # --- define ---                        #     """                        #     send_dict = {                        #         input_face_b64: 抓拍人脸图像                        #         face_uuid: 人脸id                        #         face_name: 人脸名称                        #         known_face_b64: 底库人脸图像                        #         face_similarity: 相似度                        #         face_type_name_list: 人员类型                        #     }                        #     """                        #     send_dict = dict(                        #         input_face_b64=cls.image_to_b64(data.get('snap_face_image')),                        #         # input_face_b64=str(),                        #         known_face_b64=str(),                        #         face_uuid=str(),                        #         face_name=str(),                        #         face_similarity=data.get('face_similarity'),                        #         face_type_name_list=list(),                        #     )                        #                        #     # --- fill input_face_b64 ---                        #     # snap_face_image_path = data.get('snap_face_image_path')                        #     # if snap_face_image_path and methods.is_file(snap_face_image_path):                        #     #     frame = cv2.imread(snap_face_image_path)                        #     #     if frame is not None:                        #     #         _, image = cv2.imencode('.jpg', frame)                        #     #         base64_data = base64.b64encode(image)  # byte to b64 byte                        #     #         s = base64_data.decode()  # byte to str                        #     #         send_dict['input_face_b64'] = f'data:image/jpeg;base64,{s}'                        #                        #     # --- fill known_face_b64 ---                        #     base_face_image_path = data.get('base_face_image_path')                        #     if base_face_image_path and methods.is_file(base_face_image_path):                        #         frame = cv2.imread(base_face_image_path)                        #         if frame is not None:                        #             _, image = cv2.imencode('.jpg', frame)                        #             base64_data = base64.b64encode(image)  # byte to b64 byte                        #             s = base64_data.decode()  # byte to str                        #             send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'                        #                        #     # --- fill face_uuid and face_name ---                        #     """                        #     Face: 陌生人脸表                        #     Face.face_name: 人脸名称                        #     """                        #     face_uuid = data.get('base_face_uuid')                        #     if face_uuid:                        #         send_dict['face_uuid'] = face_uuid                        #         face = Global.mdb.get_one_by_id('Face', face_uuid)                        #         if face and face.get('face_name'):                        #             send_dict['face_name'] = face.get('face_name')                        #                        #         # --- fill face_type_name_list ---                        #         face_type_uuid_list = face.get('face_type_uuid_list')                        #         if face_type_uuid_list:                        #             send_dict['face_type_name_list'] = [face_type_name_dict.get(i)                        #                                                 for i in face_type_uuid_list                        #                                                 if face_type_name_dict.get(i)]                        #                        #     # --- send ---                        #     # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")                        #     line = cls.line_dict.get(line_id)                        #     send_json = methods.json_dumps(send_dict)                        #     await line.send_text(send_json)                        #     # methods.debug_log(f"LineManage",                        #     #                   f"m-161: end at {datetime.datetime.now().strftime('%H:%M:%S.%f')}")                        #     # await asyncio.sleep(0.1)                    except Exception as exception:                        # --- check ---                        if not cls.check_line_is_live(username):                            cls.line_dict.pop(username)                        if exception.__class__.__name__ == 'RuntimeError':                            methods.debug_log(f"LineManage.check_loop194", f"d2: {cls.get_line_state()}")                        else:                            methods.debug_log("LineManage.check_loop194", f"#e: {exception.__class__.__name__}")                            methods.debug_log("LineManage.check_loop194", f"#t: {traceback.format_exc()}")                    # --- debug ---                    methods.debug_log(f"LineManage.check_loop198", f"wait 1 minutes check again")                    await asyncio.sleep(60)                    # await asyncio.sleep(60)                    # await asyncio.sleep(60)            except Exception as exception:                methods.debug_log(f"LineManage.check_loop208", f"#e: {exception.__class__.__name__}")                methods.debug_log(f"LineManage.check_loop208", f"#t: {traceback.format_exc()}")                methods.debug_log(f"LineManage.check_loop208", f"wait 1 minutes try again!")                await asyncio.sleep(60)    @classmethod    def get_line_total(cls):        count = 0        for k, v in cls.line_dict.items():            count += 1        return count    @classmethod    def check_line_is_live(cls, line_id):        d1 = {            0: 'CONNECTING',            1: 'CONNECTED',            2: 'DISCONNECTED',        }        _ws, _, _ = cls.line_dict.get(line_id)        if _ws and d1.get(_ws.client_state.value) != 'DISCONNECTED':            return True        else:            return False    @classmethod    def get_line_state(cls):        d1 = {            0: 'CONNECTING',            1: 'CONNECTED',            2: 'DISCONNECTED',        }        d2 = dict()  # {<line_id>: <state>}        for line_id, line in cls.line_dict.items():            state = d1.get(line.client_state.value)            _id = line_id[-6:]            d2[_id] = state        return d2    # @staticmethod    # def image_to_b64(image):    #     frame = numpy_method.to_array(image)  # list to numpy array    #     _, image = cv2.imencode('.jpg', frame)    #     base64_data = base64.b64encode(image)    #     s = base64_data.decode()    #     return f'data:image/jpeg;base64,{s}'
 |