|
- from typing import Dict, List, Optional, Tuple
- import torch
- from torch import nn, Tensor
- from torch.nn import functional as F
- from torchvision.ops import boxes as box_ops, Conv2dNormActivation
- from . import _utils as det_utils
- # Import AnchorGenerator to keep compatibility.
- from .anchor_utils import AnchorGenerator # noqa: 401
- from .image_list import ImageList
- class RPNHead(nn.Module):
- """
- Adds a simple RPN Head with classification and regression heads
- Args:
- in_channels (int): number of channels of the input feature
- num_anchors (int): number of anchors to be predicted
- conv_depth (int, optional): number of convolutions
- """
- _version = 2
- def __init__(self, in_channels: int, num_anchors: int, conv_depth=1) -> None:
- super().__init__()
- convs = []
- for _ in range(conv_depth):
- convs.append(Conv2dNormActivation(in_channels, in_channels, kernel_size=3, norm_layer=None))
- self.conv = nn.Sequential(*convs)
- self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
- self.bbox_pred = nn.Conv2d(in_channels, num_anchors * 4, kernel_size=1, stride=1)
- for layer in self.modules():
- if isinstance(layer, nn.Conv2d):
- torch.nn.init.normal_(layer.weight, std=0.01) # type: ignore[arg-type]
- if layer.bias is not None:
- torch.nn.init.constant_(layer.bias, 0) # type: ignore[arg-type]
- def _load_from_state_dict(
- self,
- state_dict,
- prefix,
- local_metadata,
- strict,
- missing_keys,
- unexpected_keys,
- error_msgs,
- ):
- version = local_metadata.get("version", None)
- if version is None or version < 2:
- for type in ["weight", "bias"]:
- old_key = f"{prefix}conv.{type}"
- new_key = f"{prefix}conv.0.0.{type}"
- if old_key in state_dict:
- state_dict[new_key] = state_dict.pop(old_key)
- super()._load_from_state_dict(
- state_dict,
- prefix,
- local_metadata,
- strict,
- missing_keys,
- unexpected_keys,
- error_msgs,
- )
- def forward(self, x: List[Tensor]) -> Tuple[List[Tensor], List[Tensor]]:
- logits = []
- bbox_reg = []
- for feature in x:
- t = self.conv(feature)
- logits.append(self.cls_logits(t))
- bbox_reg.append(self.bbox_pred(t))
- return logits, bbox_reg
- def permute_and_flatten(layer: Tensor, N: int, A: int, C: int, H: int, W: int) -> Tensor:
- layer = layer.view(N, -1, C, H, W)
- layer = layer.permute(0, 3, 4, 1, 2)
- layer = layer.reshape(N, -1, C)
- return layer
- def concat_box_prediction_layers(box_cls: List[Tensor], box_regression: List[Tensor]) -> Tuple[Tensor, Tensor]:
- box_cls_flattened = []
- box_regression_flattened = []
- # for each feature level, permute the outputs to make them be in the
- # same format as the labels. Note that the labels are computed for
- # all feature levels concatenated, so we keep the same representation
- # for the objectness and the box_regression
- for box_cls_per_level, box_regression_per_level in zip(box_cls, box_regression):
- N, AxC, H, W = box_cls_per_level.shape
- Ax4 = box_regression_per_level.shape[1]
- A = Ax4 // 4
- C = AxC // A
- box_cls_per_level = permute_and_flatten(box_cls_per_level, N, A, C, H, W)
- box_cls_flattened.append(box_cls_per_level)
- box_regression_per_level = permute_and_flatten(box_regression_per_level, N, A, 4, H, W)
- box_regression_flattened.append(box_regression_per_level)
- # concatenate on the first dimension (representing the feature levels), to
- # take into account the way the labels were generated (with all feature maps
- # being concatenated as well)
- box_cls = torch.cat(box_cls_flattened, dim=1).flatten(0, -2)
- box_regression = torch.cat(box_regression_flattened, dim=1).reshape(-1, 4)
- return box_cls, box_regression
- class RegionProposalNetwork(torch.nn.Module):
- """
- Implements Region Proposal Network (RPN).
- Args:
- anchor_generator (AnchorGenerator): module that generates the anchors for a set of feature
- maps.
- head (nn.Module): module that computes the objectness and regression deltas
- fg_iou_thresh (float): minimum IoU between the anchor and the GT box so that they can be
- considered as positive during training of the RPN.
- bg_iou_thresh (float): maximum IoU between the anchor and the GT box so that they can be
- considered as negative during training of the RPN.
- batch_size_per_image (int): number of anchors that are sampled during training of the RPN
- for computing the loss
- positive_fraction (float): proportion of positive anchors in a mini-batch during training
- of the RPN
- pre_nms_top_n (Dict[str, int]): number of proposals to keep before applying NMS. It should
- contain two fields: training and testing, to allow for different values depending
- on training or evaluation
- post_nms_top_n (Dict[str, int]): number of proposals to keep after applying NMS. It should
- contain two fields: training and testing, to allow for different values depending
- on training or evaluation
- nms_thresh (float): NMS threshold used for postprocessing the RPN proposals
- """
- __annotations__ = {
- "box_coder": det_utils.BoxCoder,
- "proposal_matcher": det_utils.Matcher,
- "fg_bg_sampler": det_utils.BalancedPositiveNegativeSampler,
- }
- def __init__(
- self,
- anchor_generator: AnchorGenerator,
- head: nn.Module,
- # Faster-RCNN Training
- fg_iou_thresh: float,
- bg_iou_thresh: float,
- batch_size_per_image: int,
- positive_fraction: float,
- # Faster-RCNN Inference
- pre_nms_top_n: Dict[str, int],
- post_nms_top_n: Dict[str, int],
- nms_thresh: float,
- score_thresh: float = 0.0,
- ) -> None:
- super().__init__()
- self.anchor_generator = anchor_generator
- self.head = head
- self.box_coder = det_utils.BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
- # used during training
- self.box_similarity = box_ops.box_iou
- self.proposal_matcher = det_utils.Matcher(
- fg_iou_thresh,
- bg_iou_thresh,
- allow_low_quality_matches=True,
- )
- self.fg_bg_sampler = det_utils.BalancedPositiveNegativeSampler(batch_size_per_image, positive_fraction)
- # used during testing
- self._pre_nms_top_n = pre_nms_top_n
- self._post_nms_top_n = post_nms_top_n
- self.nms_thresh = nms_thresh
- self.score_thresh = score_thresh
- self.min_size = 1e-3
- def pre_nms_top_n(self) -> int:
- if self.training:
- return self._pre_nms_top_n["training"]
- return self._pre_nms_top_n["testing"]
- def post_nms_top_n(self) -> int:
- if self.training:
- return self._post_nms_top_n["training"]
- return self._post_nms_top_n["testing"]
- def assign_targets_to_anchors(
- self, anchors: List[Tensor], targets: List[Dict[str, Tensor]]
- ) -> Tuple[List[Tensor], List[Tensor]]:
- labels = []
- matched_gt_boxes = []
- for anchors_per_image, targets_per_image in zip(anchors, targets):
- gt_boxes = targets_per_image["boxes"]
- if gt_boxes.numel() == 0:
- # Background image (negative example)
- device = anchors_per_image.device
- matched_gt_boxes_per_image = torch.zeros(anchors_per_image.shape, dtype=torch.float32, device=device)
- labels_per_image = torch.zeros((anchors_per_image.shape[0],), dtype=torch.float32, device=device)
- else:
- match_quality_matrix = self.box_similarity(gt_boxes, anchors_per_image)
- matched_idxs = self.proposal_matcher(match_quality_matrix)
- # get the targets corresponding GT for each proposal
- # NB: need to clamp the indices because we can have a single
- # GT in the image, and matched_idxs can be -2, which goes
- # out of bounds
- matched_gt_boxes_per_image = gt_boxes[matched_idxs.clamp(min=0)]
- labels_per_image = matched_idxs >= 0
- labels_per_image = labels_per_image.to(dtype=torch.float32)
- # Background (negative examples)
- bg_indices = matched_idxs == self.proposal_matcher.BELOW_LOW_THRESHOLD
- labels_per_image[bg_indices] = 0.0
- # discard indices that are between thresholds
- inds_to_discard = matched_idxs == self.proposal_matcher.BETWEEN_THRESHOLDS
- labels_per_image[inds_to_discard] = -1.0
- labels.append(labels_per_image)
- matched_gt_boxes.append(matched_gt_boxes_per_image)
- return labels, matched_gt_boxes
- def _get_top_n_idx(self, objectness: Tensor, num_anchors_per_level: List[int]) -> Tensor:
- r = []
- offset = 0
- for ob in objectness.split(num_anchors_per_level, 1):
- num_anchors = ob.shape[1]
- pre_nms_top_n = det_utils._topk_min(ob, self.pre_nms_top_n(), 1)
- _, top_n_idx = ob.topk(pre_nms_top_n, dim=1)
- r.append(top_n_idx + offset)
- offset += num_anchors
- return torch.cat(r, dim=1)
- def filter_proposals(
- self,
- proposals: Tensor,
- objectness: Tensor,
- image_shapes: List[Tuple[int, int]],
- num_anchors_per_level: List[int],
- ) -> Tuple[List[Tensor], List[Tensor]]:
- num_images = proposals.shape[0]
- device = proposals.device
- # do not backprop through objectness
- objectness = objectness.detach()
- objectness = objectness.reshape(num_images, -1)
- levels = [
- torch.full((n,), idx, dtype=torch.int64, device=device) for idx, n in enumerate(num_anchors_per_level)
- ]
- levels = torch.cat(levels, 0)
- levels = levels.reshape(1, -1).expand_as(objectness)
- # select top_n boxes independently per level before applying nms
- top_n_idx = self._get_top_n_idx(objectness, num_anchors_per_level)
- image_range = torch.arange(num_images, device=device)
- batch_idx = image_range[:, None]
- objectness = objectness[batch_idx, top_n_idx]
- levels = levels[batch_idx, top_n_idx]
- proposals = proposals[batch_idx, top_n_idx]
- objectness_prob = torch.sigmoid(objectness)
- final_boxes = []
- final_scores = []
- for boxes, scores, lvl, img_shape in zip(proposals, objectness_prob, levels, image_shapes):
- boxes = box_ops.clip_boxes_to_image(boxes, img_shape)
- # remove small boxes
- keep = box_ops.remove_small_boxes(boxes, self.min_size)
- boxes, scores, lvl = boxes[keep], scores[keep], lvl[keep]
- # remove low scoring boxes
- # use >= for Backwards compatibility
- keep = torch.where(scores >= self.score_thresh)[0]
- boxes, scores, lvl = boxes[keep], scores[keep], lvl[keep]
- # non-maximum suppression, independently done per level
- keep = box_ops.batched_nms(boxes, scores, lvl, self.nms_thresh)
- # keep only topk scoring predictions
- keep = keep[: self.post_nms_top_n()]
- boxes, scores = boxes[keep], scores[keep]
- final_boxes.append(boxes)
- final_scores.append(scores)
- return final_boxes, final_scores
- def compute_loss(
- self, objectness: Tensor, pred_bbox_deltas: Tensor, labels: List[Tensor], regression_targets: List[Tensor]
- ) -> Tuple[Tensor, Tensor]:
- """
- Args:
- objectness (Tensor)
- pred_bbox_deltas (Tensor)
- labels (List[Tensor])
- regression_targets (List[Tensor])
- Returns:
- objectness_loss (Tensor)
- box_loss (Tensor)
- """
- sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
- sampled_pos_inds = torch.where(torch.cat(sampled_pos_inds, dim=0))[0]
- sampled_neg_inds = torch.where(torch.cat(sampled_neg_inds, dim=0))[0]
- sampled_inds = torch.cat([sampled_pos_inds, sampled_neg_inds], dim=0)
- objectness = objectness.flatten()
- labels = torch.cat(labels, dim=0)
- regression_targets = torch.cat(regression_targets, dim=0)
- box_loss = F.smooth_l1_loss(
- pred_bbox_deltas[sampled_pos_inds],
- regression_targets[sampled_pos_inds],
- beta=1 / 9,
- reduction="sum",
- ) / (sampled_inds.numel())
- objectness_loss = F.binary_cross_entropy_with_logits(objectness[sampled_inds], labels[sampled_inds])
- return objectness_loss, box_loss
- def forward(
- self,
- images: ImageList,
- features: Dict[str, Tensor],
- targets: Optional[List[Dict[str, Tensor]]] = None,
- ) -> Tuple[List[Tensor], Dict[str, Tensor]]:
- """
- Args:
- images (ImageList): images for which we want to compute the predictions
- features (Dict[str, Tensor]): features computed from the images that are
- used for computing the predictions. Each tensor in the list
- correspond to different feature levels
- targets (List[Dict[str, Tensor]]): ground-truth boxes present in the image (optional).
- If provided, each element in the dict should contain a field `boxes`,
- with the locations of the ground-truth boxes.
- Returns:
- boxes (List[Tensor]): the predicted boxes from the RPN, one Tensor per
- image.
- losses (Dict[str, Tensor]): the losses for the model during training. During
- testing, it is an empty dict.
- """
- # RPN uses all feature maps that are available
- features = list(features.values())
- objectness, pred_bbox_deltas = self.head(features)
- anchors = self.anchor_generator(images, features)
- num_images = len(anchors)
- num_anchors_per_level_shape_tensors = [o[0].shape for o in objectness]
- num_anchors_per_level = [s[0] * s[1] * s[2] for s in num_anchors_per_level_shape_tensors]
- objectness, pred_bbox_deltas = concat_box_prediction_layers(objectness, pred_bbox_deltas)
- # apply pred_bbox_deltas to anchors to obtain the decoded proposals
- # note that we detach the deltas because Faster R-CNN do not backprop through
- # the proposals
- proposals = self.box_coder.decode(pred_bbox_deltas.detach(), anchors)
- proposals = proposals.view(num_images, -1, 4)
- boxes, scores = self.filter_proposals(proposals, objectness, images.image_sizes, num_anchors_per_level)
- losses = {}
- if self.training:
- if targets is None:
- raise ValueError("targets should not be None")
- labels, matched_gt_boxes = self.assign_targets_to_anchors(anchors, targets)
- regression_targets = self.box_coder.encode(matched_gt_boxes, anchors)
- loss_objectness, loss_rpn_box_reg = self.compute_loss(
- objectness, pred_bbox_deltas, labels, regression_targets
- )
- losses = {
- "loss_objectness": loss_objectness,
- "loss_rpn_box_reg": loss_rpn_box_reg,
- }
- return boxes, losses
|