123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import os
- import sys
- import onnx
- import onnxruntime as ort
- import cv2
- import numpy as np
- import time
- CLASSES = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
- 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
- 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
- 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
- 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
- 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
- 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
- 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
- 'hair drier', 'toothbrush']
- class Yolov5ONNX(object):
- def __init__(self, onnx_path):
- onnx_model = onnx.load(onnx_path)
- try:
- onnx.checker.check_model(onnx_model)
- except Exception:
- print("Model incorrect")
- else:
- print("Model correct")
- self.onnx_session = ort.InferenceSession(onnx_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
- providers = self.onnx_session.get_providers()
- if 'CUDAExecutionProvider' in providers:
- print("Using CUDA for inference.")
- else:
- print("CUDA is not available, using CPU for inference.")
- self.input_name = self.get_input_name()
- self.output_name = self.get_output_name()
- self.input_size = (640, 640)
- def get_input_name(self):
- input_name = []
- for node in self.onnx_session.get_inputs():
- input_name.append(node.name)
- return input_name
- def get_output_name(self):
- output_name = []
- for node in self.onnx_session.get_outputs():
- output_name.append(node.name)
- return output_name
- def get_input_feed(self, image_numpy):
- input_feed = {}
- for name in self.input_name:
- input_feed[name] = image_numpy
- return input_feed
- def inference(self, img):
- h, w, _ = img.shape
- new_w, new_h = self.input_size
- scale = min(new_w / w, new_h / h)
- new_w = int(w * scale)
- new_h = int(h * scale)
- img_resized = cv2.resize(img, (new_w, new_h))
- padded_img = np.zeros((self.input_size[1], self.input_size[0], 3), dtype=np.uint8)
- pad_x = (self.input_size[0] - new_w) // 2
- pad_y = (self.input_size[1] - new_h) // 2
- padded_img[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = img_resized
- img_rgb = cv2.cvtColor(padded_img, cv2.COLOR_BGR2RGB).transpose(2, 0, 1)
- img_rgb = img_rgb.astype(np.float32)
- img_rgb /= 255.0
- img_rgb = np.expand_dims(img_rgb, axis=0)
- input_feed = self.get_input_feed(img_rgb)
- start_time = time.time()
- pred = self.onnx_session.run(None, input_feed)[0]
- end_time = time.time()
- inference_time = end_time - start_time
- print(f"Inference time: {inference_time:.4f} seconds")
- return pred, padded_img
- def nms(dets, thresh):
- x1 = dets[:, 0]
- y1 = dets[:, 1]
- x2 = dets[:, 2]
- y2 = dets[:, 3]
- areas = (y2 - y1 + 1) * (x2 - x1 + 1)
- scores = dets[:, 4]
- keep = []
- index = scores.argsort()[::-1]
- while index.size > 0:
- i = index[0]
- keep.append(i)
- x11 = np.maximum(x1[i], x1[index[1:]])
- y11 = np.maximum(y1[i], y1[index[1:]])
- x22 = np.minimum(x2[i], x2[index[1:]])
- y22 = np.minimum(y2[i], y2[index[1:]])
- w = np.maximum(0, x22 - x11 + 1)
- h = np.maximum(0, y22 - y11 + 1)
- overlaps = w * h
- ious = overlaps / (areas[i] + areas[index[1:]] - overlaps)
- idx = np.where(ious <= thresh)[0]
- index = index[idx + 1]
- return keep
- def xywh2xyxy(x):
- y = np.copy(x)
- y[:, 0] = x[:, 0] - x[:, 2] / 2
- y[:, 1] = x[:, 1] - x[:, 3] / 2
- y[:, 2] = x[:, 0] + x[:, 2] / 2
- y[:, 3] = x[:, 1] + x[:, 3] / 2
- return y
- def filter_box(org_box, conf_thres, iou_thres):
- org_box = np.squeeze(org_box)
- conf = org_box[..., 4] > conf_thres
- box = org_box[conf == True]
- if box.size == 0:
- return np.array([])
- cls_cinf = box[..., 5:]
- cls = [int(np.argmax(cls_cinf[i])) for i in range(len(cls_cinf))]
- person_boxes = [box[i] for i in range(len(cls)) if cls[i] == 0]
- if len(person_boxes) == 0:
- return np.array([])
- person_boxes = np.array(person_boxes)
- person_boxes = xywh2xyxy(person_boxes)
- person_out_box = nms(person_boxes, iou_thres)
- output = [person_boxes[k] for k in person_out_box]
- return np.array(output)
- def draw(image, box_data):
- if box_data.size == 0:
- return image
- boxes = box_data[..., :4].astype(np.int32)
- scores = box_data[..., 4]
- classes = box_data[..., 5].astype(np.int32)
- for box, score, cl in zip(boxes, scores, classes):
- top, left, right, bottom = box
- cv2.rectangle(image, (top, left), (right, bottom), (255, 0, 0), 2)
- cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score),
- (top, left),
- cv2.FONT_HERSHEY_SIMPLEX,
- 0.6, (0, 0, 255), 2)
- return image
- def main():
- onnx_path = 'yolov5s.onnx'
- model = Yolov5ONNX(onnx_path)
- cap = cv2.VideoCapture(7, cv2.CAP_V4L2)
- cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
- cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
- if not cap.isOpened():
- print("无法打开摄像头")
- sys.exit(0)
- last_save_time = time.time()
- frame_count = 0
- output_dir = "saved_images"
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- # 控制帧率
- frame_rate = 1 # 每秒最多处理10帧
- prev_time = time.time()
- while True:
- ret, frame = cap.read()
- if not ret:
- print("无法读取摄像头图像")
- break
- current_time = time.time()
- if current_time - prev_time >= 1.0 / frame_rate:
- prev_time = current_time
- output, org_img = model.inference(frame)
- outbox = filter_box(output, 0.5, 0.5)
- org_img = draw(org_img, outbox)
- if time.time() - last_save_time >= 2:
- frame_count += 1
- image_path = os.path.join(output_dir, f'result_{frame_count}.jpg')
- cv2.imwrite(image_path, org_img)
- print(f"Image saved: {image_path}")
- last_save_time = time.time()
- if frame_count >= 5:
- print("保存了5张图片,程序退出")
- break
- cap.release()
- cv2.destroyAllWindows()
- if __name__ == "__main__":
- main()
|