| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 | # update: 2022-6-24"""海康人脸摄像机接口    提供 read 1、read 100,两种方法"""import requestsimport datetimeimport timeimport sysimport importlib# sys.path.append('/home/server/projects/taiwuict/cscec-8bur-vms/supplement-python')sys.path.append(r'D:\share\gitee\taiwuict.cscec-8bur-vms\supplement-python')  # for pcmethods = importlib.import_module(f"libraries.base_original")class API(object):    def __init__(self):        self.camera_ipv4 = None        self.camera_user = None        self.camera_pass = None        self.client = None        self.last_receive_at = None    def connect_camera(self, camera_ipv4, camera_user, camera_pass):        """        连接摄像机        """        self.camera_ipv4 = camera_ipv4        self.camera_user = camera_user        self.camera_pass = camera_pass        session = requests.session()        request_url = f'http://{self.camera_ipv4}:80/ISAPI/Event/notification/alertStream'  # 设置认证信息        auth = requests.auth.HTTPDigestAuth(self.camera_user, self.camera_pass)  # 发送请求,获取响应        self.client = session.get(request_url, auth=auth, verify=False, stream=True)    def throw(self, backdata, image_hex):        """        回传数据        """        if backdata is None:            return        if self.camera_ipv4 not in backdata:            backdata[self.camera_ipv4] = dict()        object_id = str(time.time())        backdata[self.camera_ipv4][object_id] = dict()        backdata[self.camera_ipv4][object_id]['tracking_is'] = False        backdata[self.camera_ipv4][object_id]['hex_image'] = image_hex    def read1(self, backdata=None):        """        """        # --- define ---        image_hex = str()        start_is = False        print_is = False        hex_start = ''        hex_end = ''        while True:            # --- get ---            line = self.client.raw.read(1)            line = line.hex()            # --- fill ---            now_at = time.time()            if not self.last_receive_at:                self.last_receive_at = now_at            if line:                self.last_receive_at = now_at            # --- fill ---            hex_start += line            # --- check ---            if len(hex_start) > 8:                hex_start = hex_start[-8:]            # --- check ---            if '0d0affd8' == hex_start:                start_is = True                image_hex = 'ff'                methods.debug_log('hikvision_detector',                                  f"m-44: 0d0affd8 is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")            # --- fill ---            if start_is:                image_hex += line            # --- check ---            if start_is:                hex_end += line                if len(hex_end) > 8:                    hex_end = hex_end[-8:]                if 'ffd90d0a' == hex_end:                    print_is = True                    image_hex = image_hex[:-4]                    methods.debug_log('hikvision_detector',                                      f"m-44: ffd90d0a is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")            # --- fill ---            if print_is:                self.throw(backdata, image_hex)                image_hex = str()                hex_start = str()                hex_end = str()                start_is = False                print_is = Falseif __name__ == '__main__':    # --- init ---    middledata = {}    agent = API()    agent.connect_camera(camera_ipv4='192.168.0.181', camera_user='admin', camera_pass='DEVdev123')    agent.read1({})
 |