import os import gc import cv2 import imutils import threading import time # 导入时间模块 from AIDetector_pytorch import Detector import logging import argparse # 导入argparse库 # 用于控制检测线程退出的事件 stop_event = threading.Event() def start_camera_detector(camera_id, width, height, detector, output_directory): name = 'Demo Camera {}'.format(camera_id) cap = cv2.VideoCapture(camera_id, cv2.CAP_V4L2) if not cap.isOpened(): print('Error: Unable to open camera {}.'.format(camera_id)) return cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30 print(f'{name} fps:', fps) frame_count = 0 while not stop_event.is_set(): # 检查停止事件 ret, im = cap.read() if not ret or im is None: break if frame_count % 3 == 0: start_time = time.time() result = detector.feedCap(im) result_frame = result['frame'] elapsed_time = time.time() - start_time result_frame = imutils.resize(result_frame, height=500) # Save the result image filename = os.path.join(output_directory, f'camera_{camera_id}_frame_{frame_count}.jpg') cv2.imwrite(filename, result_frame) print(f'Saved image: {filename}') logging.info(f'Frame {frame_count} processed in {elapsed_time:.4f} seconds.') frame_count += 1 if frame_count % 30 == 0: gc.collect() cap.release() def main(): parser = argparse.ArgumentParser(description='Camera Detection with ONNX.') parser.add_argument('--camera_id', type=int, default=1, help='Camera ID to use.') parser.add_argument('--width', type=int, default=1280, help='Input the wight of the video image(default=1280)') parser.add_argument('--height', type=int, default=720, help='Input the height of the video image(default=720)') args = parser.parse_args() detector = Detector() output_directory = 'output_images' os.makedirs(output_directory, exist_ok=True) # Create output directory if not exists threads = [] thread = threading.Thread(target=start_camera_detector, args=(args.camera_id, args.width, args.height, detector, output_directory)) thread.start() threads.append(thread) # 等待用户输入停止程序 input("Press Enter to stop the camera detection...\n") # 设置停止事件 stop_event.set() # 等待所有线程结束 for thread in threads: thread.join() if __name__ == "__main__": main()