Spaces:
Running
on
Zero
Running
on
Zero
| from collections import defaultdict, OrderedDict | |
| import os | |
| import os.path as osp | |
| import numpy as np | |
| from tqdm import tqdm | |
| import argparse | |
| import cv2 | |
| from pathlib import Path | |
| import warnings | |
| import json | |
| import time | |
| from src.utils.metrics import estimate_pose, relative_pose_error, error_auc, symmetric_epipolar_distance_numpy | |
| from src.utils.plotting import dynamic_alpha, error_colormap, make_matching_figure | |
| # Loading functions for methods | |
| #################################################################### | |
| def load_xoftr(args): | |
| from src.xoftr import XoFTR | |
| from src.config.default import get_cfg_defaults | |
| from src.utils.data_io import DataIOWrapper, lower_config | |
| config = get_cfg_defaults(inference=True) | |
| config = lower_config(config) | |
| config["xoftr"]["match_coarse"]["thr"] = args.match_threshold | |
| config["xoftr"]["fine"]["thr"] = args.fine_threshold | |
| ckpt = args.ckpt | |
| matcher = XoFTR(config=config["xoftr"]) | |
| matcher = DataIOWrapper(matcher, config=config["test"], ckpt=ckpt) | |
| return matcher.from_paths | |
| #################################################################### | |
| def load_vis_tir_pairs_npz(npz_root, npz_list): | |
| """Load information for scene and image pairs from npz files. | |
| Args: | |
| npz_root: Directory path for npz files | |
| npz_list: File containing the names of the npz files to be used | |
| """ | |
| with open(npz_list, 'r') as f: | |
| npz_names = [name.split()[0] for name in f.readlines()] | |
| print(f"Parse {len(npz_names)} npz from {npz_list}.") | |
| total_pairs = 0 | |
| scene_pairs = {} | |
| for name in npz_names: | |
| print(f"Loading {name}") | |
| scene_info = np.load(f"{npz_root}/{name}", allow_pickle=True) | |
| pairs = [] | |
| # Collect pairs | |
| for pair_info in scene_info['pair_infos']: | |
| total_pairs += 1 | |
| (id0, id1) = pair_info | |
| im0 = scene_info['image_paths'][id0][0] | |
| im1 = scene_info['image_paths'][id1][1] | |
| K0 = scene_info['intrinsics'][id0][0].astype(np.float32) | |
| K1 = scene_info['intrinsics'][id1][1].astype(np.float32) | |
| dist0 = np.array(scene_info['distortion_coefs'][id0][0], dtype=float) | |
| dist1 = np.array(scene_info['distortion_coefs'][id1][1], dtype=float) | |
| # Compute relative pose | |
| T0 = scene_info['poses'][id0] | |
| T1 = scene_info['poses'][id1] | |
| T_0to1 = np.matmul(T1, np.linalg.inv(T0)) | |
| pairs.append({'im0':im0, 'im1':im1, 'dist0':dist0, 'dist1':dist1, | |
| 'K0':K0, 'K1':K1, 'T_0to1':T_0to1}) | |
| scene_pairs[name] = pairs | |
| print(f"Loaded {total_pairs} pairs.") | |
| return scene_pairs | |
| def save_matching_figure(path, img0, img1, mkpts0, mkpts1, inlier_mask, T_0to1, K0, K1, t_err=None, R_err=None, name=None, conf_thr = 5e-4): | |
| """ Make and save matching figures | |
| """ | |
| Tx = np.cross(np.eye(3), T_0to1[:3, 3]) | |
| E_mat = Tx @ T_0to1[:3, :3] | |
| mkpts0_inliers = mkpts0[inlier_mask] | |
| mkpts1_inliers = mkpts1[inlier_mask] | |
| if inlier_mask is not None and len(inlier_mask) != 0: | |
| epi_errs = symmetric_epipolar_distance_numpy(mkpts0_inliers, mkpts1_inliers, E_mat, K0, K1) | |
| correct_mask = epi_errs < conf_thr | |
| precision = np.mean(correct_mask) if len(correct_mask) > 0 else 0 | |
| n_correct = np.sum(correct_mask) | |
| # matching info | |
| alpha = dynamic_alpha(len(correct_mask)) | |
| color = error_colormap(epi_errs, conf_thr, alpha=alpha) | |
| text_precision =[ | |
| f'Precision({conf_thr:.2e}) ({100 * precision:.1f}%): {n_correct}/{len(mkpts0_inliers)}'] | |
| else: | |
| text_precision =[ | |
| f'No inliers after ransac'] | |
| if name is not None: | |
| text=[name] | |
| else: | |
| text = [] | |
| if t_err is not None and R_err is not None: | |
| error_text = [f"err_t: {t_err:.2f} °", f"err_R: {R_err:.2f} °"] | |
| text +=error_text | |
| text += text_precision | |
| # make the figure | |
| figure = make_matching_figure(img0, img1, mkpts0_inliers, mkpts1_inliers, | |
| color, text=text, path=path, dpi=150) | |
| def aggregiate_scenes(scene_pose_auc, thresholds): | |
| """Averages the auc results for cloudy_cloud and cloudy_sunny scenes | |
| """ | |
| temp_pose_auc = {} | |
| for npz_name in scene_pose_auc.keys(): | |
| scene_name = npz_name.split("_scene")[0] | |
| temp_pose_auc[scene_name] = [np.zeros(len(thresholds), dtype=np.float32), 0] # [sum, total_number] | |
| for npz_name in scene_pose_auc.keys(): | |
| scene_name = npz_name.split("_scene")[0] | |
| temp_pose_auc[scene_name][0] += scene_pose_auc[npz_name] | |
| temp_pose_auc[scene_name][1] += 1 | |
| agg_pose_auc = {} | |
| for scene_name in temp_pose_auc.keys(): | |
| agg_pose_auc[scene_name] = temp_pose_auc[scene_name][0] / temp_pose_auc[scene_name][1] | |
| return agg_pose_auc | |
| def eval_relapose( | |
| matcher, | |
| data_root, | |
| scene_pairs, | |
| ransac_thres, | |
| thresholds, | |
| save_figs, | |
| figures_dir=None, | |
| method=None, | |
| print_out=False, | |
| debug=False, | |
| ): | |
| scene_pose_auc = {} | |
| for scene_name in scene_pairs.keys(): | |
| scene_dir = osp.join(figures_dir, scene_name.split(".")[0]) | |
| if save_figs and not osp.exists(scene_dir): | |
| os.makedirs(scene_dir) | |
| pairs = scene_pairs[scene_name] | |
| statis = defaultdict(list) | |
| np.set_printoptions(precision=2) | |
| # Eval on pairs | |
| print(f"\nStart evaluation on VisTir \n") | |
| for i, pair in tqdm(enumerate(pairs), smoothing=.1, total=len(pairs)): | |
| if debug and i > 10: | |
| break | |
| T_0to1 = pair['T_0to1'] | |
| im0 = str(data_root / pair['im0']) | |
| im1 = str(data_root / pair['im1']) | |
| match_res = matcher(im0, im1, pair['K0'], pair['K1'], pair['dist0'], pair['dist1']) | |
| matches = match_res['matches'] | |
| new_K0 = match_res['new_K0'] | |
| new_K1 = match_res['new_K1'] | |
| mkpts0 = match_res['mkpts0'] | |
| mkpts1 = match_res['mkpts1'] | |
| # Calculate pose errors | |
| ret = estimate_pose( | |
| mkpts0, mkpts1, new_K0, new_K1, thresh=ransac_thres | |
| ) | |
| if ret is None: | |
| R, t, inliers = None, None, None | |
| t_err, R_err = np.inf, np.inf | |
| statis['failed'].append(i) | |
| statis['R_errs'].append(R_err) | |
| statis['t_errs'].append(t_err) | |
| statis['inliers'].append(np.array([]).astype(np.bool_)) | |
| else: | |
| R, t, inliers = ret | |
| t_err, R_err = relative_pose_error(T_0to1, R, t) | |
| statis['R_errs'].append(R_err) | |
| statis['t_errs'].append(t_err) | |
| statis['inliers'].append(inliers.sum() / len(mkpts0)) | |
| if print_out: | |
| print(f"#M={len(matches)} R={R_err:.3f}, t={t_err:.3f}") | |
| if save_figs: | |
| img0_name = f"{'vis' if 'visible' in pair['im0'] else 'tir'}_{osp.basename(pair['im0']).split('.')[0]}" | |
| img1_name = f"{'vis' if 'visible' in pair['im1'] else 'tir'}_{osp.basename(pair['im1']).split('.')[0]}" | |
| fig_path = osp.join(scene_dir, f"{img0_name}_{img1_name}.jpg") | |
| save_matching_figure(path=fig_path, | |
| img0=match_res['img0_undistorted'] if 'img0_undistorted' in match_res.keys() else match_res['img0'], | |
| img1=match_res['img1_undistorted'] if 'img1_undistorted' in match_res.keys() else match_res['img1'], | |
| mkpts0=mkpts0, | |
| mkpts1=mkpts1, | |
| inlier_mask=inliers, | |
| T_0to1=T_0to1, | |
| K0=new_K0, | |
| K1=new_K1, | |
| t_err=t_err, | |
| R_err=R_err, | |
| name=method | |
| ) | |
| print(f"Scene: {scene_name} Total samples: {len(pairs)} Failed:{len(statis['failed'])}. \n") | |
| pose_errors = np.max(np.stack([statis['R_errs'], statis['t_errs']]), axis=0) | |
| pose_auc = error_auc(pose_errors, thresholds) # (auc@5, auc@10, auc@20) | |
| scene_pose_auc[scene_name] = 100 * np.array([pose_auc[f'auc@{t}'] for t in thresholds]) | |
| print(f"{scene_name} {pose_auc}") | |
| agg_pose_auc = aggregiate_scenes(scene_pose_auc, thresholds) | |
| return scene_pose_auc, agg_pose_auc | |
| def test_relative_pose_vistir( | |
| data_root_dir, | |
| method="xoftr", | |
| exp_name = "VisTIR", | |
| ransac_thres=1.5, | |
| print_out=False, | |
| save_dir=None, | |
| save_figs=False, | |
| debug=False, | |
| args=None | |
| ): | |
| if not osp.exists(osp.join(save_dir, method)): | |
| os.makedirs(osp.join(save_dir, method)) | |
| counter = 0 | |
| path = osp.join(save_dir, method, f"{exp_name}"+"_{}") | |
| while osp.exists(path.format(counter)): | |
| counter += 1 | |
| exp_dir = path.format(counter) | |
| os.mkdir(exp_dir) | |
| results_file = osp.join(exp_dir, "results.json") | |
| figures_dir = osp.join(exp_dir, "match_figures") | |
| if save_figs: | |
| os.mkdir(figures_dir) | |
| # Init paths | |
| npz_root = data_root_dir / 'index/scene_info_test/' | |
| npz_list = data_root_dir / 'index/val_test_list/test_list.txt' | |
| data_root = data_root_dir | |
| # Load pairs | |
| scene_pairs = load_vis_tir_pairs_npz(npz_root, npz_list) | |
| # Load method | |
| matcher = eval(f"load_{method}")(args) | |
| thresholds=[5, 10, 20] | |
| # Eval | |
| scene_pose_auc, agg_pose_auc = eval_relapose( | |
| matcher, | |
| data_root, | |
| scene_pairs, | |
| ransac_thres=ransac_thres, | |
| thresholds=thresholds, | |
| save_figs=save_figs, | |
| figures_dir=figures_dir, | |
| method=method, | |
| print_out=print_out, | |
| debug=debug, | |
| ) | |
| # Create result dict | |
| results = OrderedDict({"method": method, | |
| "exp_name": exp_name, | |
| "ransac_thres": ransac_thres, | |
| "auc_thresholds": thresholds}) | |
| results.update({key:value for key, value in vars(args).items() if key not in results}) | |
| results.update({key:value.tolist() for key, value in agg_pose_auc.items()}) | |
| results.update({key:value.tolist() for key, value in scene_pose_auc.items()}) | |
| print(f"Results: {json.dumps(results, indent=4)}") | |
| # Save to json file | |
| with open(results_file, 'w') as outfile: | |
| json.dump(results, outfile, indent=4) | |
| print(f"Results saved to {results_file}") | |
| if __name__ == '__main__': | |
| def add_common_arguments(parser): | |
| parser.add_argument('--gpu', '-gpu', type=str, default='0') | |
| parser.add_argument('--exp_name', type=str, default="VisTIR") | |
| parser.add_argument('--data_root_dir', type=str, default="./data/METU_VisTIR/") | |
| parser.add_argument('--save_dir', type=str, default="./results_relative_pose") | |
| parser.add_argument('--ransac_thres', type=float, default=1.5) | |
| parser.add_argument('--print_out', action='store_true') | |
| parser.add_argument('--debug', action='store_true') | |
| parser.add_argument('--save_figs', action='store_true') | |
| def add_xoftr_arguments(subparsers): | |
| subcommand = subparsers.add_parser('xoftr') | |
| subcommand.add_argument('--match_threshold', type=float, default=0.3) | |
| subcommand.add_argument('--fine_threshold', type=float, default=0.1) | |
| subcommand.add_argument('--ckpt', type=str, default="./weights/weights_xoftr_640.ckpt") | |
| add_common_arguments(subcommand) | |
| parser = argparse.ArgumentParser(description='Benchmark Relative Pose') | |
| add_common_arguments(parser) | |
| # Create subparsers for top-level commands | |
| subparsers = parser.add_subparsers(dest="method") | |
| add_xoftr_arguments(subparsers) | |
| args = parser.parse_args() | |
| os.environ['CUDA_VISIBLE_DEVICES'] = "0" | |
| tt = time.time() | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| test_relative_pose_vistir( | |
| Path(args.data_root_dir), | |
| args.method, | |
| args.exp_name, | |
| ransac_thres=args.ransac_thres, | |
| print_out=args.print_out, | |
| save_dir = args.save_dir, | |
| save_figs = args.save_figs, | |
| debug=args.debug, | |
| args=args | |
| ) | |
| print(f"Elapsed time: {time.time() - tt}") | |