api.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # update: 2022-6-24
  2. """
  3. 海康人脸摄像机接口
  4. 提供 read 1、read 100,两种方法
  5. """
  6. import requests
  7. import datetime
  8. import time
  9. import sys
  10. import importlib
  11. # sys.path.append('/home/server/projects/taiwuict/cscec-8bur-vms/supplement-python')
  12. sys.path.append(r'D:\share\gitee\taiwuict.cscec-8bur-vms\supplement-python') # for pc
  13. methods = importlib.import_module(f"libraries.base_original")
  14. class API(object):
  15. def __init__(self):
  16. self.camera_ipv4 = None
  17. self.camera_user = None
  18. self.camera_pass = None
  19. self.client = None
  20. self.last_receive_at = None
  21. def connect_camera(self, camera_ipv4, camera_user, camera_pass):
  22. """
  23. 连接摄像机
  24. """
  25. self.camera_ipv4 = camera_ipv4
  26. self.camera_user = camera_user
  27. self.camera_pass = camera_pass
  28. session = requests.session()
  29. request_url = f'http://{self.camera_ipv4}:80/ISAPI/Event/notification/alertStream' # 设置认证信息
  30. auth = requests.auth.HTTPDigestAuth(self.camera_user, self.camera_pass) # 发送请求,获取响应
  31. self.client = session.get(request_url, auth=auth, verify=False, stream=True)
  32. def throw(self, backdata, image_hex):
  33. """
  34. 回传数据
  35. """
  36. if backdata is None:
  37. return
  38. if self.camera_ipv4 not in backdata:
  39. backdata[self.camera_ipv4] = dict()
  40. object_id = str(time.time())
  41. backdata[self.camera_ipv4][object_id] = dict()
  42. backdata[self.camera_ipv4][object_id]['tracking_is'] = False
  43. backdata[self.camera_ipv4][object_id]['hex_image'] = image_hex
  44. def read1(self, backdata=None):
  45. """
  46. """
  47. # --- define ---
  48. image_hex = str()
  49. start_is = False
  50. print_is = False
  51. hex_start = ''
  52. hex_end = ''
  53. while True:
  54. # --- get ---
  55. line = self.client.raw.read(1)
  56. line = line.hex()
  57. # --- fill ---
  58. now_at = time.time()
  59. if not self.last_receive_at:
  60. self.last_receive_at = now_at
  61. if line:
  62. self.last_receive_at = now_at
  63. # --- fill ---
  64. hex_start += line
  65. # --- check ---
  66. if len(hex_start) > 8:
  67. hex_start = hex_start[-8:]
  68. # --- check ---
  69. if '0d0affd8' == hex_start:
  70. start_is = True
  71. image_hex = 'ff'
  72. methods.debug_log('hikvision_detector',
  73. f"m-44: 0d0affd8 is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")
  74. # --- fill ---
  75. if start_is:
  76. image_hex += line
  77. # --- check ---
  78. if start_is:
  79. hex_end += line
  80. if len(hex_end) > 8:
  81. hex_end = hex_end[-8:]
  82. if 'ffd90d0a' == hex_end:
  83. print_is = True
  84. image_hex = image_hex[:-4]
  85. methods.debug_log('hikvision_detector',
  86. f"m-44: ffd90d0a is {datetime.datetime.now().strftime('%H:%M:%S.%f')}")
  87. # --- fill ---
  88. if print_is:
  89. self.throw(backdata, image_hex)
  90. image_hex = str()
  91. hex_start = str()
  92. hex_end = str()
  93. start_is = False
  94. print_is = False
  95. if __name__ == '__main__':
  96. # --- init ---
  97. middledata = {}
  98. agent = API()
  99. agent.connect_camera(camera_ipv4='192.168.0.181', camera_user='admin', camera_pass='DEVdev123')
  100. agent.read1({})