read100.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from hub import methods, Global
  2. from middledata import Middledata
  3. import requests
  4. import time
  5. def main(args):
  6. while True:
  7. try:
  8. camera_ipv4, camera_user, camera_pass = args[1], args[2], args[3]
  9. request_url = f'http://{camera_ipv4}:80/ISAPI/Event/notification/alertStream' # 设置认证信息
  10. auth = requests.auth.HTTPDigestAuth(camera_user, camera_pass) # 发送请求,获取响应
  11. session = requests.session()
  12. response = session.get(request_url, auth=auth, verify=False, stream=True)
  13. print(response.headers)
  14. print(response.headers.get('content-type'))
  15. image_hex = str()
  16. start_is = False
  17. print_is = False
  18. while True:
  19. # --- check ---
  20. # now_at = time.time()
  21. # if (now_at - run_at) > 300.0: # todo 5分钟检测一次
  22. # run_at = now_at
  23. # while True:
  24. # rtsp = f"rtsp://{camera_user}:{camera_pass}@{camera_ipv4}:554/h264/ch1/main/av_stream"
  25. # cap = cv2.VideoCapture(rtsp)
  26. # ret, _ = cap.read()
  27. # if ret:
  28. # break
  29. # else:
  30. # methods.debug_log('hikvision_detector', f"m-40: wait 1 minutes try again!")
  31. # time.sleep(60)
  32. # --- get ---
  33. line = response.raw.read(100)
  34. # line = response.raw.read(1024*1024)
  35. # line = response.raw.read(1024*256)
  36. # methods.debug_log('hikvision_detector', f"m-52: check at {methods.now_string()}")
  37. # methods.debug_log('hikvision_detector', f"m-52: line | {line} | {type(line)}")
  38. # --- check ---
  39. if not line:
  40. continue
  41. line = line.hex()
  42. # --- check ---
  43. if '0d0affd8' in line:
  44. # methods.debug_log('hikvision_detector', f"m-52: check start at {methods.now_string()}")
  45. start_is = True
  46. line = 'ffd8' + line.split('0d0affd8')[1]
  47. # --- check ---
  48. if 'ffd90d0a' in line:
  49. # methods.debug_log('hikvision_detector', f"m-52: check end at {methods.now_string()}")
  50. print_is = True
  51. line = line.split('ffd90d0a')[0] + 'ffd9'
  52. # --- fill ---
  53. if start_is:
  54. image_hex += line
  55. # --- save ---
  56. if print_is:
  57. # --- fill ---
  58. object_id = str(time.time())
  59. Middledata.target_dict[object_id] = dict()
  60. Middledata.target_dict[object_id]['tracking_is'] = False
  61. Middledata.target_dict[object_id]['hex_image'] = image_hex # base64.b16decode(image_hex.upper())
  62. # --- update ---
  63. image_hex = str()
  64. start_is = False
  65. print_is = False
  66. except Exception as exception:
  67. methods.debug_log('hikvision_detector', f"m-64: exception | {exception}")
  68. methods.debug_log('hikvision_detector', f"m-64: wait 10 minutes try again!")
  69. time.sleep(600)
  70. continue
  71. if __name__ == '__main__':
  72. _args = ['python3', '192.168.0.181', 'admin', 'DEVdev123']
  73. main(_args)