Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| import contextlib | |
| import copy | |
| import io | |
| import itertools | |
| import logging | |
| import numpy as np | |
| import os | |
| from collections import OrderedDict | |
| from typing import Dict, Iterable, List, Optional | |
| import pycocotools.mask as mask_utils | |
| import torch | |
| from pycocotools.coco import COCO | |
| from tabulate import tabulate | |
| from detectron2.config import CfgNode | |
| from detectron2.data import MetadataCatalog | |
| from detectron2.evaluation import DatasetEvaluator | |
| from detectron2.structures import BoxMode | |
| from detectron2.utils.comm import gather, get_rank, is_main_process, synchronize | |
| from detectron2.utils.file_io import PathManager | |
| from detectron2.utils.logger import create_small_table | |
| from densepose.converters import ToChartResultConverter, ToMaskConverter | |
| from densepose.data.datasets.coco import maybe_filter_and_map_categories_cocoapi | |
| from densepose.structures import ( | |
| DensePoseChartPredictorOutput, | |
| DensePoseEmbeddingPredictorOutput, | |
| quantize_densepose_chart_result, | |
| ) | |
| from .densepose_coco_evaluation import DensePoseCocoEval, DensePoseEvalMode | |
| from .mesh_alignment_evaluator import MeshAlignmentEvaluator | |
| from .tensor_storage import ( | |
| SingleProcessFileTensorStorage, | |
| SingleProcessRamTensorStorage, | |
| SingleProcessTensorStorage, | |
| SizeData, | |
| storage_gather, | |
| ) | |
| class DensePoseCOCOEvaluator(DatasetEvaluator): | |
| def __init__( | |
| self, | |
| dataset_name, | |
| distributed, | |
| output_dir=None, | |
| evaluator_type: str = "iuv", | |
| min_iou_threshold: float = 0.5, | |
| storage: Optional[SingleProcessTensorStorage] = None, | |
| embedder=None, | |
| should_evaluate_mesh_alignment: bool = False, | |
| mesh_alignment_mesh_names: Optional[List[str]] = None, | |
| ): | |
| self._embedder = embedder | |
| self._distributed = distributed | |
| self._output_dir = output_dir | |
| self._evaluator_type = evaluator_type | |
| self._storage = storage | |
| self._should_evaluate_mesh_alignment = should_evaluate_mesh_alignment | |
| assert not ( | |
| should_evaluate_mesh_alignment and embedder is None | |
| ), "Mesh alignment evaluation is activated, but no vertex embedder provided!" | |
| if should_evaluate_mesh_alignment: | |
| self._mesh_alignment_evaluator = MeshAlignmentEvaluator( | |
| embedder, | |
| mesh_alignment_mesh_names, | |
| ) | |
| self._cpu_device = torch.device("cpu") | |
| self._logger = logging.getLogger(__name__) | |
| self._metadata = MetadataCatalog.get(dataset_name) | |
| self._min_threshold = min_iou_threshold | |
| json_file = PathManager.get_local_path(self._metadata.json_file) | |
| with contextlib.redirect_stdout(io.StringIO()): | |
| self._coco_api = COCO(json_file) | |
| maybe_filter_and_map_categories_cocoapi(dataset_name, self._coco_api) | |
| def reset(self): | |
| self._predictions = [] | |
| def process(self, inputs, outputs): | |
| """ | |
| Args: | |
| inputs: the inputs to a COCO model (e.g., GeneralizedRCNN). | |
| It is a list of dict. Each dict corresponds to an image and | |
| contains keys like "height", "width", "file_name", "image_id". | |
| outputs: the outputs of a COCO model. It is a list of dicts with key | |
| "instances" that contains :class:`Instances`. | |
| The :class:`Instances` object needs to have `densepose` field. | |
| """ | |
| for input, output in zip(inputs, outputs): | |
| instances = output["instances"].to(self._cpu_device) | |
| if not instances.has("pred_densepose"): | |
| continue | |
| prediction_list = prediction_to_dict( | |
| instances, | |
| input["image_id"], | |
| self._embedder, | |
| self._metadata.class_to_mesh_name, | |
| self._storage is not None, | |
| ) | |
| if self._storage is not None: | |
| for prediction_dict in prediction_list: | |
| dict_to_store = {} | |
| for field_name in self._storage.data_schema: | |
| dict_to_store[field_name] = prediction_dict[field_name] | |
| record_id = self._storage.put(dict_to_store) | |
| prediction_dict["record_id"] = record_id | |
| prediction_dict["rank"] = get_rank() | |
| for field_name in self._storage.data_schema: | |
| del prediction_dict[field_name] | |
| self._predictions.extend(prediction_list) | |
| def evaluate(self, img_ids=None): | |
| if self._distributed: | |
| synchronize() | |
| predictions = gather(self._predictions) | |
| predictions = list(itertools.chain(*predictions)) | |
| else: | |
| predictions = self._predictions | |
| multi_storage = storage_gather(self._storage) if self._storage is not None else None | |
| if not is_main_process(): | |
| return | |
| return copy.deepcopy(self._eval_predictions(predictions, multi_storage, img_ids)) | |
| def _eval_predictions(self, predictions, multi_storage=None, img_ids=None): | |
| """ | |
| Evaluate predictions on densepose. | |
| Return results with the metrics of the tasks. | |
| """ | |
| self._logger.info("Preparing results for COCO format ...") | |
| if self._output_dir: | |
| PathManager.mkdirs(self._output_dir) | |
| file_path = os.path.join(self._output_dir, "coco_densepose_predictions.pth") | |
| with PathManager.open(file_path, "wb") as f: | |
| torch.save(predictions, f) | |
| self._logger.info("Evaluating predictions ...") | |
| res = OrderedDict() | |
| results_gps, results_gpsm, results_segm = _evaluate_predictions_on_coco( | |
| self._coco_api, | |
| predictions, | |
| multi_storage, | |
| self._embedder, | |
| class_names=self._metadata.get("thing_classes"), | |
| min_threshold=self._min_threshold, | |
| img_ids=img_ids, | |
| ) | |
| res["densepose_gps"] = results_gps | |
| res["densepose_gpsm"] = results_gpsm | |
| res["densepose_segm"] = results_segm | |
| if self._should_evaluate_mesh_alignment: | |
| res["densepose_mesh_alignment"] = self._evaluate_mesh_alignment() | |
| return res | |
| def _evaluate_mesh_alignment(self): | |
| self._logger.info("Mesh alignment evaluation ...") | |
| mean_ge, mean_gps, per_mesh_metrics = self._mesh_alignment_evaluator.evaluate() | |
| results = { | |
| "GE": mean_ge * 100, | |
| "GPS": mean_gps * 100, | |
| } | |
| mesh_names = set() | |
| for metric_name in per_mesh_metrics: | |
| for mesh_name, value in per_mesh_metrics[metric_name].items(): | |
| results[f"{metric_name}-{mesh_name}"] = value * 100 | |
| mesh_names.add(mesh_name) | |
| self._print_mesh_alignment_results(results, mesh_names) | |
| return results | |
| def _print_mesh_alignment_results(self, results: Dict[str, float], mesh_names: Iterable[str]): | |
| self._logger.info("Evaluation results for densepose, mesh alignment:") | |
| self._logger.info(f'| {"Mesh":13s} | {"GErr":7s} | {"GPS":7s} |') | |
| self._logger.info("| :-----------: | :-----: | :-----: |") | |
| for mesh_name in mesh_names: | |
| ge_key = f"GE-{mesh_name}" | |
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " | |
| gps_key = f"GPS-{mesh_name}" | |
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " | |
| self._logger.info(f"| {mesh_name:13s} | {ge_str:7s} | {gps_str:7s} |") | |
| self._logger.info("| :-------------------------------: |") | |
| ge_key = "GE" | |
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " " | |
| gps_key = "GPS" | |
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " " | |
| self._logger.info(f'| {"MEAN":13s} | {ge_str:7s} | {gps_str:7s} |') | |
| def prediction_to_dict(instances, img_id, embedder, class_to_mesh_name, use_storage): | |
| """ | |
| Args: | |
| instances (Instances): the output of the model | |
| img_id (str): the image id in COCO | |
| Returns: | |
| list[dict]: the results in densepose evaluation format | |
| """ | |
| scores = instances.scores.tolist() | |
| classes = instances.pred_classes.tolist() | |
| raw_boxes_xywh = BoxMode.convert( | |
| instances.pred_boxes.tensor.clone(), BoxMode.XYXY_ABS, BoxMode.XYWH_ABS | |
| ) | |
| if isinstance(instances.pred_densepose, DensePoseEmbeddingPredictorOutput): | |
| results_densepose = densepose_cse_predictions_to_dict( | |
| instances, embedder, class_to_mesh_name, use_storage | |
| ) | |
| elif isinstance(instances.pred_densepose, DensePoseChartPredictorOutput): | |
| if not use_storage: | |
| results_densepose = densepose_chart_predictions_to_dict(instances) | |
| else: | |
| results_densepose = densepose_chart_predictions_to_storage_dict(instances) | |
| results = [] | |
| for k in range(len(instances)): | |
| result = { | |
| "image_id": img_id, | |
| "category_id": classes[k], | |
| "bbox": raw_boxes_xywh[k].tolist(), | |
| "score": scores[k], | |
| } | |
| results.append({**result, **results_densepose[k]}) | |
| return results | |
| def densepose_chart_predictions_to_dict(instances): | |
| segmentations = ToMaskConverter.convert( | |
| instances.pred_densepose, instances.pred_boxes, instances.image_size | |
| ) | |
| results = [] | |
| for k in range(len(instances)): | |
| densepose_results_quantized = quantize_densepose_chart_result( | |
| ToChartResultConverter.convert(instances.pred_densepose[k], instances.pred_boxes[k]) | |
| ) | |
| densepose_results_quantized.labels_uv_uint8 = ( | |
| densepose_results_quantized.labels_uv_uint8.cpu() | |
| ) | |
| segmentation = segmentations.tensor[k] | |
| segmentation_encoded = mask_utils.encode( | |
| np.require(segmentation.numpy(), dtype=np.uint8, requirements=["F"]) | |
| ) | |
| segmentation_encoded["counts"] = segmentation_encoded["counts"].decode("utf-8") | |
| result = { | |
| "densepose": densepose_results_quantized, | |
| "segmentation": segmentation_encoded, | |
| } | |
| results.append(result) | |
| return results | |
| def densepose_chart_predictions_to_storage_dict(instances): | |
| results = [] | |
| for k in range(len(instances)): | |
| densepose_predictor_output = instances.pred_densepose[k] | |
| result = { | |
| "coarse_segm": densepose_predictor_output.coarse_segm.squeeze(0).cpu(), | |
| "fine_segm": densepose_predictor_output.fine_segm.squeeze(0).cpu(), | |
| "u": densepose_predictor_output.u.squeeze(0).cpu(), | |
| "v": densepose_predictor_output.v.squeeze(0).cpu(), | |
| } | |
| results.append(result) | |
| return results | |
| def densepose_cse_predictions_to_dict(instances, embedder, class_to_mesh_name, use_storage): | |
| results = [] | |
| for k in range(len(instances)): | |
| cse = instances.pred_densepose[k] | |
| results.append( | |
| { | |
| "coarse_segm": cse.coarse_segm[0].cpu(), | |
| "embedding": cse.embedding[0].cpu(), | |
| } | |
| ) | |
| return results | |
| def _evaluate_predictions_on_coco( | |
| coco_gt, | |
| coco_results, | |
| multi_storage=None, | |
| embedder=None, | |
| class_names=None, | |
| min_threshold: float = 0.5, | |
| img_ids=None, | |
| ): | |
| logger = logging.getLogger(__name__) | |
| densepose_metrics = _get_densepose_metrics(min_threshold) | |
| if len(coco_results) == 0: # cocoapi does not handle empty results very well | |
| logger.warn("No predictions from the model! Set scores to -1") | |
| results_gps = {metric: -1 for metric in densepose_metrics} | |
| results_gpsm = {metric: -1 for metric in densepose_metrics} | |
| results_segm = {metric: -1 for metric in densepose_metrics} | |
| return results_gps, results_gpsm, results_segm | |
| coco_dt = coco_gt.loadRes(coco_results) | |
| results = [] | |
| for eval_mode_name in ["GPS", "GPSM", "IOU"]: | |
| eval_mode = getattr(DensePoseEvalMode, eval_mode_name) | |
| coco_eval = DensePoseCocoEval( | |
| coco_gt, coco_dt, "densepose", multi_storage, embedder, dpEvalMode=eval_mode | |
| ) | |
| result = _derive_results_from_coco_eval( | |
| coco_eval, eval_mode_name, densepose_metrics, class_names, min_threshold, img_ids | |
| ) | |
| results.append(result) | |
| return results | |
| def _get_densepose_metrics(min_threshold: float = 0.5): | |
| metrics = ["AP"] | |
| if min_threshold <= 0.201: | |
| metrics += ["AP20"] | |
| if min_threshold <= 0.301: | |
| metrics += ["AP30"] | |
| if min_threshold <= 0.401: | |
| metrics += ["AP40"] | |
| metrics.extend(["AP50", "AP75", "APm", "APl", "AR", "AR50", "AR75", "ARm", "ARl"]) | |
| return metrics | |
| def _derive_results_from_coco_eval( | |
| coco_eval, eval_mode_name, metrics, class_names, min_threshold: float, img_ids | |
| ): | |
| if img_ids is not None: | |
| coco_eval.params.imgIds = img_ids | |
| coco_eval.params.iouThrs = np.linspace( | |
| min_threshold, 0.95, int(np.round((0.95 - min_threshold) / 0.05)) + 1, endpoint=True | |
| ) | |
| coco_eval.evaluate() | |
| coco_eval.accumulate() | |
| coco_eval.summarize() | |
| results = {metric: float(coco_eval.stats[idx] * 100) for idx, metric in enumerate(metrics)} | |
| logger = logging.getLogger(__name__) | |
| logger.info( | |
| f"Evaluation results for densepose, {eval_mode_name} metric: \n" | |
| + create_small_table(results) | |
| ) | |
| if class_names is None or len(class_names) <= 1: | |
| return results | |
| # Compute per-category AP, the same way as it is done in D2 | |
| # (see detectron2/evaluation/coco_evaluation.py): | |
| precisions = coco_eval.eval["precision"] | |
| # precision has dims (iou, recall, cls, area range, max dets) | |
| assert len(class_names) == precisions.shape[2] | |
| results_per_category = [] | |
| for idx, name in enumerate(class_names): | |
| # area range index 0: all area ranges | |
| # max dets index -1: typically 100 per image | |
| precision = precisions[:, :, idx, 0, -1] | |
| precision = precision[precision > -1] | |
| ap = np.mean(precision) if precision.size else float("nan") | |
| results_per_category.append((f"{name}", float(ap * 100))) | |
| # tabulate it | |
| n_cols = min(6, len(results_per_category) * 2) | |
| results_flatten = list(itertools.chain(*results_per_category)) | |
| results_2d = itertools.zip_longest(*[results_flatten[i::n_cols] for i in range(n_cols)]) | |
| table = tabulate( | |
| results_2d, | |
| tablefmt="pipe", | |
| floatfmt=".3f", | |
| headers=["category", "AP"] * (n_cols // 2), | |
| numalign="left", | |
| ) | |
| logger.info(f"Per-category {eval_mode_name} AP: \n" + table) | |
| results.update({"AP-" + name: ap for name, ap in results_per_category}) | |
| return results | |
| def build_densepose_evaluator_storage(cfg: CfgNode, output_folder: str): | |
| storage_spec = cfg.DENSEPOSE_EVALUATION.STORAGE | |
| if storage_spec == "none": | |
| return None | |
| evaluator_type = cfg.DENSEPOSE_EVALUATION.TYPE | |
| # common output tensor sizes | |
| hout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE | |
| wout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE | |
| n_csc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_COARSE_SEGM_CHANNELS | |
| # specific output tensors | |
| if evaluator_type == "iuv": | |
| n_fsc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_PATCHES + 1 | |
| schema = { | |
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), | |
| "fine_segm": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), | |
| "u": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), | |
| "v": SizeData(dtype="float32", shape=(n_fsc, hout, wout)), | |
| } | |
| elif evaluator_type == "cse": | |
| embed_size = cfg.MODEL.ROI_DENSEPOSE_HEAD.CSE.EMBED_SIZE | |
| schema = { | |
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)), | |
| "embedding": SizeData(dtype="float32", shape=(embed_size, hout, wout)), | |
| } | |
| else: | |
| raise ValueError(f"Unknown evaluator type: {evaluator_type}") | |
| # storage types | |
| if storage_spec == "ram": | |
| storage = SingleProcessRamTensorStorage(schema, io.BytesIO()) | |
| elif storage_spec == "file": | |
| fpath = os.path.join(output_folder, f"DensePoseEvaluatorStorage.{get_rank()}.bin") | |
| PathManager.mkdirs(output_folder) | |
| storage = SingleProcessFileTensorStorage(schema, fpath, "wb") | |
| else: | |
| raise ValueError(f"Unknown storage specification: {storage_spec}") | |
| return storage | |