Spaces:
Runtime error
Runtime error
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| import copy | |
| import inspect | |
| import logging | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import torch | |
| from torch import nn | |
| from torch.nn import functional as F | |
| from detectron2.config import configurable | |
| from detectron2.layers import ShapeSpec, nonzero_tuple, batched_nms, cat | |
| from detectron2.structures import Boxes, ImageList, Instances, pairwise_iou, pairwise_ioa | |
| from detectron2.utils.events import get_event_storage | |
| from detectron2.utils.registry import Registry | |
| from detectron2.modeling.backbone.resnet import BottleneckBlock, ResNet | |
| from detectron2.modeling.matcher import Matcher | |
| from detectron2.modeling.poolers import ROIPooler | |
| from detectron2.modeling.sampling import subsample_labels | |
| from detectron2.modeling.box_regression import Box2BoxTransform | |
| from detectron2.modeling.roi_heads.fast_rcnn import fast_rcnn_inference | |
| from detectron2.modeling.roi_heads.box_head import build_box_head | |
| from detectron2.modeling import ROI_HEADS_REGISTRY, StandardROIHeads | |
| from detectron2.modeling.roi_heads.roi_heads import Res5ROIHeads | |
| from detectron2.modeling.roi_heads.cascade_rcnn import CascadeROIHeads, _ScaleGradient | |
| from .vlpart_fast_rcnn import VLMFastRCNNOutputLayers | |
| def build_vlpart_roi_heads(cfg, input_shape): | |
| return CascadeVLMROIHeads(cfg, input_shape) | |
| class CascadeVLMROIHeads(CascadeROIHeads): | |
| def _init_box_head(self, cfg, input_shape): | |
| # fmt: off | |
| in_features = cfg.MODEL.ROI_HEADS.IN_FEATURES | |
| pooler_resolution = cfg.MODEL.ROI_BOX_HEAD.POOLER_RESOLUTION | |
| pooler_scales = tuple(1.0 / input_shape[k].stride for k in in_features) | |
| sampling_ratio = cfg.MODEL.ROI_BOX_HEAD.POOLER_SAMPLING_RATIO | |
| pooler_type = cfg.MODEL.ROI_BOX_HEAD.POOLER_TYPE | |
| cascade_bbox_reg_weights = cfg.MODEL.ROI_BOX_CASCADE_HEAD.BBOX_REG_WEIGHTS | |
| cascade_ious = cfg.MODEL.ROI_BOX_CASCADE_HEAD.IOUS | |
| assert len(cascade_bbox_reg_weights) == len(cascade_ious) | |
| assert cfg.MODEL.ROI_BOX_HEAD.CLS_AGNOSTIC_BBOX_REG, \ | |
| "CascadeROIHeads only support class-agnostic regression now!" | |
| assert cascade_ious[0] == cfg.MODEL.ROI_HEADS.IOU_THRESHOLDS[0] | |
| # fmt: on | |
| # If StandardROIHeads is applied on multiple feature maps (as in FPN), | |
| # then we share the same predictors and therefore the channel counts must be the same | |
| in_channels = [input_shape[f].channels for f in in_features] | |
| # Check all channel counts are equal | |
| assert len(set(in_channels)) == 1, in_channels | |
| in_channels = in_channels[0] | |
| box_pooler = ROIPooler( | |
| output_size=pooler_resolution, | |
| scales=pooler_scales, | |
| sampling_ratio=sampling_ratio, | |
| pooler_type=pooler_type, | |
| ) | |
| pooled_shape = ShapeSpec( | |
| channels=in_channels, height=pooler_resolution, width=pooler_resolution | |
| ) | |
| box_heads, box_predictors, proposal_matchers = [], [], [] | |
| for match_iou, bbox_reg_weights in zip(cascade_ious, cascade_bbox_reg_weights): | |
| box_head = build_box_head(cfg, pooled_shape) | |
| box_heads.append(box_head) | |
| box_predictors.append( | |
| VLMFastRCNNOutputLayers( | |
| box_head.output_shape, | |
| box2box_transform=Box2BoxTransform(weights=bbox_reg_weights), | |
| ) | |
| ) | |
| proposal_matchers.append(Matcher([match_iou], [0, 1], allow_low_quality_matches=False)) | |
| return { | |
| "box_in_features": in_features, | |
| "box_pooler": box_pooler, | |
| "box_heads": box_heads, | |
| "box_predictors": box_predictors, | |
| "proposal_matchers": proposal_matchers, | |
| } | |
| def forward(self, images, features, proposals, text_embed): | |
| del images | |
| assert not self.training, 'only support inference now' | |
| pred_instances = self._forward_box( | |
| features, proposals, text_embed=text_embed) | |
| pred_instances = self.forward_with_given_boxes(features, pred_instances) | |
| return pred_instances, {} | |
| def _forward_box(self, features, proposals, text_embed): | |
| features = [features[f] for f in self.box_in_features] | |
| head_outputs = [] # (predictor, predictions, proposals) | |
| prev_pred_boxes = None | |
| image_sizes = [x.image_size for x in proposals] | |
| for k in range(self.num_cascade_stages): | |
| if k > 0: | |
| proposals = self._create_proposals_from_boxes( | |
| prev_pred_boxes, image_sizes) | |
| if self.training and ann_type in ['box', 'part']: | |
| proposals = self._match_and_label_boxes( | |
| proposals, k, targets) | |
| predictions = self._run_stage(features, proposals, k, text_embed) | |
| prev_pred_boxes = self.box_predictor[k].predict_boxes( | |
| (predictions[0], predictions[1]), proposals) | |
| head_outputs.append((self.box_predictor[k], predictions, proposals)) | |
| assert not self.training, 'only support inference now' | |
| # Each is a list[Tensor] of length #image. Each tensor is Ri x (K+1) | |
| scores_per_stage = [h[0].predict_probs(h[1], h[2]) for h in head_outputs] | |
| scores = [ | |
| sum(list(scores_per_image)) * (1.0 / self.num_cascade_stages) | |
| for scores_per_image in zip(*scores_per_stage) | |
| ] | |
| predictor, predictions, proposals = head_outputs[-1] | |
| boxes = predictor.predict_boxes((predictions[0], predictions[1]), proposals) | |
| pred_instances, _ = fast_rcnn_inference( | |
| boxes, | |
| scores, | |
| image_sizes, | |
| predictor.test_score_thresh, | |
| predictor.test_nms_thresh, | |
| predictor.test_topk_per_image, | |
| ) | |
| return pred_instances | |
| def _create_proposals_from_boxes(self, boxes, image_sizes): | |
| boxes = [Boxes(b.detach()) for b in boxes] | |
| proposals = [] | |
| for boxes_per_image, image_size in zip(boxes, image_sizes): | |
| boxes_per_image.clip(image_size) | |
| prop = Instances(image_size) | |
| prop.proposal_boxes = boxes_per_image | |
| proposals.append(prop) | |
| return proposals | |
| def _run_stage(self, features, proposals, stage, text_embed): | |
| pool_boxes = [x.proposal_boxes for x in proposals] | |
| box_features = self.box_pooler(features, pool_boxes) | |
| box_features = _ScaleGradient.apply(box_features, 1.0 / self.num_cascade_stages) | |
| box_features = self.box_head[stage](box_features) | |
| return self.box_predictor[stage](box_features, text_embed) | |