123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # update: 2022-6-24
- """
- 海康人脸摄像机接口
- 提供 read 1、read 100,两种方法
- """
- import requests
- import datetime
- import time
- import sys
- import importlib
- # sys.path.append('/home/server/projects/taiwuict/cscec-8bur-vms/supplement-python')
- sys.path.append(r'D:\share\gitee\taiwuict.cscec-8bur-vms\supplement-python') # for pc
- methods = importlib.import_module(f"libraries.base_original")
- class API(object):
- def __init__(self):
- self.camera_ipv4 = None
- self.camera_user = None
- self.camera_pass = None
- self.client = None
- self.last_receive_at = None
- def connect_camera(self, camera_ipv4, camera_user, camera_pass):
- """
- 连接摄像机
- """
- self.camera_ipv4 = camera_ipv4
- self.camera_user = camera_user
- self.camera_pass = camera_pass
- session = requests.session()
- request_url = f'http://{self.camera_ipv4}:80/ISAPI/Event/notification/alertStream' # 设置认证信息
- auth = requests.auth.HTTPDigestAuth(self.camera_user, self.camera_pass) # 发送请求,获取响应
- self.client = session.get(request_url, auth=auth, verify=False, stream=True)
- def throw(self, backdata, image_hex):
- """
- 回传数据
- """
- if backdata is None:
- return
- if self.camera_ipv4 not in backdata:
- backdata[self.camera_ipv4] = dict()
- object_id = str(time.time())
- backdata[self.camera_ipv4][object_id] = dict()
- backdata[self.camera_ipv4][object_id]['tracking_is'] = False
- backdata[self.camera_ipv4][object_id]['hex_image'] = image_hex
- def read1(self, backdata=None):
- """
- """
- # --- define ---
- image_hex = str()
- start_is = False
- print_is = False
- hex_start = ''
- hex_end = ''
- while True:
- # --- get ---
- line = self.client.raw.read(1)
- line = line.hex()
- # --- fill ---
- now_at = time.time()
- if not self.last_receive_at:
- self.last_receive_at = now_at
- if line:
- self.last_receive_at = now_at
- # --- fill ---
- hex_start += line
- # --- check ---
- if len(hex_start) > 8:
- hex_start = hex_start[-8:]
- # --- check ---
- if '0d0affd8' == hex_start:
- start_is = True
- image_hex = 'ff'
- methods.debug_log('hikvision_detector',
- f"m-44: 0d0affd8 is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")
- # --- fill ---
- if start_is:
- image_hex += line
- # --- check ---
- if start_is:
- hex_end += line
- if len(hex_end) > 8:
- hex_end = hex_end[-8:]
- if 'ffd90d0a' == hex_end:
- print_is = True
- image_hex = image_hex[:-4]
- methods.debug_log('hikvision_detector',
- f"m-44: ffd90d0a is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")
- # --- fill ---
- if print_is:
- self.throw(backdata, image_hex)
- image_hex = str()
- hex_start = str()
- hex_end = str()
- start_is = False
- print_is = False
- if __name__ == '__main__':
- # --- init ---
- middledata = {}
- agent = API()
- agent.connect_camera(camera_ipv4='192.168.0.181', camera_user='admin', camera_pass='DEVdev123')
- agent.read1({})
|