1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import os
- import gc
- import cv2
- import imutils
- import threading
- import time # 导入时间模块
- from AIDetector_pytorch import Detector
- import logging
- import argparse # 导入argparse库
- # 用于控制检测线程退出的事件
- stop_event = threading.Event()
- def start_camera_detector(camera_id, width, height, detector, output_directory):
- name = 'Demo Camera {}'.format(camera_id)
- cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2)
- if not cap.isOpened():
- print('Error: Unable to open camera {}.'.format(camera_id))
- return
- cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
- cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
- fps = cap.get(cv2.CAP_PROP_FPS)
- if fps <= 0:
- fps = 30
- print(f'{name} fps:', fps)
- frame_count = 0
- while not stop_event.is_set(): # 检查停止事件
- ret, im = cap.read()
- if not ret or im is None:
- break
- if frame_count % 3 == 0:
- start_time = time.time()
- result = detector.feedCap(im)
- result_frame = result['frame']
- elapsed_time = time.time() - start_time
- result_frame = imutils.resize(result_frame, height=500)
- # Save the result image
- filename = os.path.join(output_directory, f'camera_{camera_id}_frame_{frame_count}.jpg')
- cv2.imwrite(filename, result_frame)
- print(f'Saved image: {filename}')
- logging.info(f'Frame {frame_count} processed in {elapsed_time:.4f} seconds.')
- frame_count += 1
- if frame_count % 30 == 0:
- gc.collect()
- cap.release()
- def main():
- parser = argparse.ArgumentParser(description='Camera Detection with ONNX.')
- parser.add_argument('--camera_id', type=int, default=1, help='Camera ID to use.')
- parser.add_argument('--width', type=int, default=1280, help='Input the wight of the video image(default=1280)')
- parser.add_argument('--height', type=int, default=720, help='Input the height of the video image(default=720)')
-
- args = parser.parse_args()
- detector = Detector()
- output_directory = 'output_images'
- os.makedirs(output_directory, exist_ok=True) # Create output directory if not exists
- threads = []
- thread = threading.Thread(target=start_camera_detector, args=(args.camera_id, args.width, args.height, detector, output_directory))
- thread.start()
- threads.append(thread)
- # 等待用户输入停止程序
- input("Press Enter to stop the camera detection...\n")
- # 设置停止事件
- stop_event.set()
- # 等待所有线程结束
- for thread in threads:
- thread.join()
- if __name__ == "__main__":
- main()
|