from hub import methods, Global from middledata import Middledata import requests import time def main(args): while True: try: camera_ipv4, camera_user, camera_pass = args[1], args[2], args[3] request_url = f'http://{camera_ipv4}:80/ISAPI/Event/notification/alertStream' # 设置认证信息 auth = requests.auth.HTTPDigestAuth(camera_user, camera_pass) # 发送请求,获取响应 session = requests.session() response = session.get(request_url, auth=auth, verify=False, stream=True) print(response.headers) print(response.headers.get('content-type')) image_hex = str() start_is = False print_is = False while True: # --- check --- # now_at = time.time() # if (now_at - run_at) > 300.0: # todo 5分钟检测一次 # run_at = now_at # while True: # rtsp = f"rtsp://{camera_user}:{camera_pass}@{camera_ipv4}:554/h264/ch1/main/av_stream" # cap = cv2.VideoCapture(rtsp) # ret, _ = cap.read() # if ret: # break # else: # methods.debug_log('hikvision_detector', f"m-40: wait 1 minutes try again!") # time.sleep(60) # --- get --- line = response.raw.read(100) # line = response.raw.read(1024*1024) # line = response.raw.read(1024*256) # methods.debug_log('hikvision_detector', f"m-52: check at {methods.now_string()}") # methods.debug_log('hikvision_detector', f"m-52: line | {line} | {type(line)}") # --- check --- if not line: continue line = line.hex() # --- check --- if '0d0affd8' in line: # methods.debug_log('hikvision_detector', f"m-52: check start at {methods.now_string()}") start_is = True line = 'ffd8' + line.split('0d0affd8')[1] # --- check --- if 'ffd90d0a' in line: # methods.debug_log('hikvision_detector', f"m-52: check end at {methods.now_string()}") print_is = True line = line.split('ffd90d0a')[0] + 'ffd9' # --- fill --- if start_is: image_hex += line # --- save --- if print_is: # --- fill --- object_id = str(time.time()) Middledata.target_dict[object_id] = dict() Middledata.target_dict[object_id]['tracking_is'] = False Middledata.target_dict[object_id]['hex_image'] = image_hex # base64.b16decode(image_hex.upper()) # --- update --- image_hex = str() start_is = False print_is = False except Exception as exception: methods.debug_log('hikvision_detector', f"m-64: exception | {exception}") methods.debug_log('hikvision_detector', f"m-64: wait 10 minutes try again!") time.sleep(600) continue if __name__ == '__main__': _args = ['python3', '192.168.0.181', 'admin', 'DEVdev123'] main(_args)