import os import sys import onnx import onnxruntime as ort import cv2 import numpy as np import time CLASSES = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'] class Yolov5ONNX(object): def __init__(self, onnx_path): onnx_model = onnx.load(onnx_path) try: onnx.checker.check_model(onnx_model) except Exception: print("Model incorrect") else: print("Model correct") self.onnx_session = ort.InferenceSession(onnx_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) providers = self.onnx_session.get_providers() if 'CUDAExecutionProvider' in providers: print("Using CUDA for inference.") else: print("CUDA is not available, using CPU for inference.") self.input_name = self.get_input_name() self.output_name = self.get_output_name() self.input_size = (640, 640) def get_input_name(self): input_name = [] for node in self.onnx_session.get_inputs(): input_name.append(node.name) return input_name def get_output_name(self): output_name = [] for node in self.onnx_session.get_outputs(): output_name.append(node.name) return output_name def get_input_feed(self, image_numpy): input_feed = {} for name in self.input_name: input_feed[name] = image_numpy return input_feed def inference(self, img): h, w, _ = img.shape new_w, new_h = self.input_size scale = min(new_w / w, new_h / h) new_w = int(w * scale) new_h = int(h * scale) img_resized = cv2.resize(img, (new_w, new_h)) padded_img = np.zeros((self.input_size[1], self.input_size[0], 3), dtype=np.uint8) pad_x = (self.input_size[0] - new_w) // 2 pad_y = (self.input_size[1] - new_h) // 2 padded_img[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = img_resized img_rgb = cv2.cvtColor(padded_img, cv2.COLOR_BGR2RGB).transpose(2, 0, 1) img_rgb = img_rgb.astype(np.float32) img_rgb /= 255.0 img_rgb = np.expand_dims(img_rgb, axis=0) input_feed = self.get_input_feed(img_rgb) start_time = time.time() pred = self.onnx_session.run(None, input_feed)[0] end_time = time.time() inference_time = end_time - start_time print(f"Inference time: {inference_time:.4f} seconds") return pred, padded_img def nms(dets, thresh): x1 = dets[:, 0] y1 = dets[:, 1] x2 = dets[:, 2] y2 = dets[:, 3] areas = (y2 - y1 + 1) * (x2 - x1 + 1) scores = dets[:, 4] keep = [] index = scores.argsort()[::-1] while index.size > 0: i = index[0] keep.append(i) x11 = np.maximum(x1[i], x1[index[1:]]) y11 = np.maximum(y1[i], y1[index[1:]]) x22 = np.minimum(x2[i], x2[index[1:]]) y22 = np.minimum(y2[i], y2[index[1:]]) w = np.maximum(0, x22 - x11 + 1) h = np.maximum(0, y22 - y11 + 1) overlaps = w * h ious = overlaps / (areas[i] + areas[index[1:]] - overlaps) idx = np.where(ious <= thresh)[0] index = index[idx + 1] return keep def xywh2xyxy(x): y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 y[:, 1] = x[:, 1] - x[:, 3] / 2 y[:, 2] = x[:, 0] + x[:, 2] / 2 y[:, 3] = x[:, 1] + x[:, 3] / 2 return y def filter_box(org_box, conf_thres, iou_thres): org_box = np.squeeze(org_box) conf = org_box[..., 4] > conf_thres box = org_box[conf == True] if box.size == 0: return np.array([]) cls_cinf = box[..., 5:] cls = [int(np.argmax(cls_cinf[i])) for i in range(len(cls_cinf))] person_boxes = [box[i] for i in range(len(cls)) if cls[i] == 0] if len(person_boxes) == 0: return np.array([]) person_boxes = np.array(person_boxes) person_boxes = xywh2xyxy(person_boxes) person_out_box = nms(person_boxes, iou_thres) output = [person_boxes[k] for k in person_out_box] return np.array(output) def draw(image, box_data): if box_data.size == 0: return image boxes = box_data[..., :4].astype(np.int32) scores = box_data[..., 4] classes = box_data[..., 5].astype(np.int32) for box, score, cl in zip(boxes, scores, classes): top, left, right, bottom = box cv2.rectangle(image, (top, left), (right, bottom), (255, 0, 0), 2) cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score), (top, left), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) return image def main(): onnx_path = 'yolov5s.onnx' model = Yolov5ONNX(onnx_path) cap = cv2.VideoCapture(7, cv2.CAP_V4L2) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) if not cap.isOpened(): print("无法打开摄像头") sys.exit(0) last_save_time = time.time() frame_count = 0 output_dir = "saved_images" if not os.path.exists(output_dir): os.makedirs(output_dir) # 控制帧率 frame_rate = 1 # 每秒最多处理10帧 prev_time = time.time() while True: ret, frame = cap.read() if not ret: print("无法读取摄像头图像") break current_time = time.time() if current_time - prev_time >= 1.0 / frame_rate: prev_time = current_time output, org_img = model.inference(frame) outbox = filter_box(output, 0.5, 0.5) org_img = draw(org_img, outbox) if time.time() - last_save_time >= 2: frame_count += 1 image_path = os.path.join(output_dir, f'result_{frame_count}.jpg') cv2.imwrite(image_path, org_img) print(f"Image saved: {image_path}") last_save_time = time.time() if frame_count >= 5: print("保存了5张图片,程序退出") break cap.release() cv2.destroyAllWindows() if __name__ == "__main__": main()