Spaces:
Build error
Build error
# Copyright (c) OpenMMLab. All rights reserved. | |
import argparse | |
import os | |
import pickle | |
import shutil | |
from os.path import join | |
import cv2 | |
import h5py | |
import mmcv | |
import numpy as np | |
from scipy.io import loadmat | |
train_subjects = [i for i in range(1, 9)] | |
test_subjects = [i for i in range(1, 7)] | |
train_seqs = [1, 2] | |
train_cams = [0, 1, 2, 4, 5, 6, 7, 8] | |
train_frame_nums = { | |
(1, 1): 6416, | |
(1, 2): 12430, | |
(2, 1): 6502, | |
(2, 2): 6081, | |
(3, 1): 12488, | |
(3, 2): 12283, | |
(4, 1): 6171, | |
(4, 2): 6675, | |
(5, 1): 12820, | |
(5, 2): 12312, | |
(6, 1): 6188, | |
(6, 2): 6145, | |
(7, 1): 6239, | |
(7, 2): 6320, | |
(8, 1): 6468, | |
(8, 2): 6054 | |
} | |
test_frame_nums = {1: 6151, 2: 6080, 3: 5838, 4: 6007, 5: 320, 6: 492} | |
train_img_size = (2048, 2048) | |
root_index = 14 | |
joints_17 = [7, 5, 14, 15, 16, 9, 10, 11, 23, 24, 25, 18, 19, 20, 4, 3, 6] | |
def get_pose_stats(kps): | |
"""Get statistic information `mean` and `std` of pose data. | |
Args: | |
kps (ndarray): keypoints in shape [..., K, C] where K and C is | |
the keypoint category number and dimension. | |
Returns: | |
mean (ndarray): [K, C] | |
""" | |
assert kps.ndim > 2 | |
K, C = kps.shape[-2:] | |
kps = kps.reshape(-1, K, C) | |
mean = kps.mean(axis=0) | |
std = kps.std(axis=0) | |
return mean, std | |
def get_annotations(joints_2d, joints_3d, scale_factor=1.2): | |
"""Get annotations, including centers, scales, joints_2d and joints_3d. | |
Args: | |
joints_2d: 2D joint coordinates in shape [N, K, 2], where N is the | |
frame number, K is the joint number. | |
joints_3d: 3D joint coordinates in shape [N, K, 3], where N is the | |
frame number, K is the joint number. | |
scale_factor: Scale factor of bounding box. Default: 1.2. | |
Returns: | |
centers (ndarray): [N, 2] | |
scales (ndarray): [N,] | |
joints_2d (ndarray): [N, K, 3] | |
joints_3d (ndarray): [N, K, 4] | |
""" | |
# calculate joint visibility | |
visibility = (joints_2d[:, :, 0] >= 0) * \ | |
(joints_2d[:, :, 0] < train_img_size[0]) * \ | |
(joints_2d[:, :, 1] >= 0) * \ | |
(joints_2d[:, :, 1] < train_img_size[1]) | |
visibility = np.array(visibility, dtype=np.float32)[:, :, None] | |
joints_2d = np.concatenate([joints_2d, visibility], axis=-1) | |
joints_3d = np.concatenate([joints_3d, visibility], axis=-1) | |
# calculate bounding boxes | |
bboxes = np.stack([ | |
np.min(joints_2d[:, :, 0], axis=1), | |
np.min(joints_2d[:, :, 1], axis=1), | |
np.max(joints_2d[:, :, 0], axis=1), | |
np.max(joints_2d[:, :, 1], axis=1) | |
], | |
axis=1) | |
centers = np.stack([(bboxes[:, 0] + bboxes[:, 2]) / 2, | |
(bboxes[:, 1] + bboxes[:, 3]) / 2], | |
axis=1) | |
scales = scale_factor * np.max(bboxes[:, 2:] - bboxes[:, :2], axis=1) / 200 | |
return centers, scales, joints_2d, joints_3d | |
def load_trainset(data_root, out_dir): | |
"""Load training data, create annotation file and camera file. | |
Args: | |
data_root: Directory of dataset, which is organized in the following | |
hierarchy: | |
data_root | |
|-- train | |
|-- S1 | |
|-- Seq1 | |
|-- Seq2 | |
|-- S2 | |
|-- ... | |
|-- test | |
|-- TS1 | |
|-- TS2 | |
|-- ... | |
out_dir: Directory to save annotation file. | |
""" | |
_imgnames = [] | |
_centers = [] | |
_scales = [] | |
_joints_2d = [] | |
_joints_3d = [] | |
cameras = {} | |
img_dir = join(out_dir, 'images') | |
os.makedirs(img_dir, exist_ok=True) | |
annot_dir = join(out_dir, 'annotations') | |
os.makedirs(annot_dir, exist_ok=True) | |
for subj in train_subjects: | |
for seq in train_seqs: | |
seq_path = join(data_root, 'train', f'S{subj}', f'Seq{seq}') | |
num_frames = train_frame_nums[(subj, seq)] | |
# load camera parametres | |
camera_file = join(seq_path, 'camera.calibration') | |
with open(camera_file, 'r') as fin: | |
lines = fin.readlines() | |
for cam in train_cams: | |
K = [float(s) for s in lines[cam * 7 + 5][11:-2].split()] | |
f = np.array([[K[0]], [K[5]]]) | |
c = np.array([[K[2]], [K[6]]]) | |
RT = np.array( | |
[float(s) for s in lines[cam * 7 + 6][11:-2].split()]) | |
RT = np.reshape(RT, (4, 4)) | |
R = RT[:3, :3] | |
# convert unit from millimeter to meter | |
T = RT[:3, 3:] * 0.001 | |
size = [int(s) for s in lines[cam * 7 + 3][14:].split()] | |
w, h = size | |
cam_param = dict( | |
R=R, T=T, c=c, f=f, w=w, h=h, name=f'train_cam_{cam}') | |
cameras[f'S{subj}_Seq{seq}_Cam{cam}'] = cam_param | |
# load annotations | |
annot_file = os.path.join(seq_path, 'annot.mat') | |
annot2 = loadmat(annot_file)['annot2'] | |
annot3 = loadmat(annot_file)['annot3'] | |
for cam in train_cams: | |
# load 2D and 3D annotations | |
joints_2d = np.reshape(annot2[cam][0][:num_frames], | |
(num_frames, 28, 2))[:, joints_17] | |
joints_3d = np.reshape(annot3[cam][0][:num_frames], | |
(num_frames, 28, 3))[:, joints_17] | |
joints_3d = joints_3d * 0.001 | |
centers, scales, joints_2d, joints_3d = get_annotations( | |
joints_2d, joints_3d) | |
_centers.append(centers) | |
_scales.append(scales) | |
_joints_2d.append(joints_2d) | |
_joints_3d.append(joints_3d) | |
# extract frames from video | |
video_path = join(seq_path, 'imageSequence', | |
f'video_{cam}.avi') | |
video = mmcv.VideoReader(video_path) | |
for i in mmcv.track_iter_progress(range(num_frames)): | |
img = video.read() | |
if img is None: | |
break | |
imgname = f'S{subj}_Seq{seq}_Cam{cam}_{i+1:06d}.jpg' | |
_imgnames.append(imgname) | |
cv2.imwrite(join(img_dir, imgname), img) | |
_imgnames = np.array(_imgnames) | |
_centers = np.concatenate(_centers) | |
_scales = np.concatenate(_scales) | |
_joints_2d = np.concatenate(_joints_2d) | |
_joints_3d = np.concatenate(_joints_3d) | |
out_file = join(annot_dir, 'mpi_inf_3dhp_train.npz') | |
np.savez( | |
out_file, | |
imgname=_imgnames, | |
center=_centers, | |
scale=_scales, | |
part=_joints_2d, | |
S=_joints_3d) | |
print(f'Create annotation file for trainset: {out_file}. ' | |
f'{len(_imgnames)} samples in total.') | |
out_file = join(annot_dir, 'cameras_train.pkl') | |
with open(out_file, 'wb') as fout: | |
pickle.dump(cameras, fout) | |
print(f'Create camera file for trainset: {out_file}.') | |
# get `mean` and `std` of pose data | |
_joints_3d = _joints_3d[..., :3] # remove visibility | |
mean_3d, std_3d = get_pose_stats(_joints_3d) | |
_joints_2d = _joints_2d[..., :2] # remove visibility | |
mean_2d, std_2d = get_pose_stats(_joints_2d) | |
# centered around root | |
_joints_3d_rel = _joints_3d - _joints_3d[..., root_index:root_index + 1, :] | |
mean_3d_rel, std_3d_rel = get_pose_stats(_joints_3d_rel) | |
mean_3d_rel[root_index] = mean_3d[root_index] | |
std_3d_rel[root_index] = std_3d[root_index] | |
_joints_2d_rel = _joints_2d - _joints_2d[..., root_index:root_index + 1, :] | |
mean_2d_rel, std_2d_rel = get_pose_stats(_joints_2d_rel) | |
mean_2d_rel[root_index] = mean_2d[root_index] | |
std_2d_rel[root_index] = std_2d[root_index] | |
stats = { | |
'joint3d_stats': { | |
'mean': mean_3d, | |
'std': std_3d | |
}, | |
'joint2d_stats': { | |
'mean': mean_2d, | |
'std': std_2d | |
}, | |
'joint3d_rel_stats': { | |
'mean': mean_3d_rel, | |
'std': std_3d_rel | |
}, | |
'joint2d_rel_stats': { | |
'mean': mean_2d_rel, | |
'std': std_2d_rel | |
} | |
} | |
for name, stat_dict in stats.items(): | |
out_file = join(annot_dir, f'{name}.pkl') | |
with open(out_file, 'wb') as f: | |
pickle.dump(stat_dict, f) | |
print(f'Create statistic data file: {out_file}') | |
def load_testset(data_root, out_dir, valid_only=True): | |
"""Load testing data, create annotation file and camera file. | |
Args: | |
data_root: Directory of dataset. | |
out_dir: Directory to save annotation file. | |
valid_only: Only keep frames with valid_label == 1. | |
""" | |
_imgnames = [] | |
_centers = [] | |
_scales = [] | |
_joints_2d = [] | |
_joints_3d = [] | |
cameras = {} | |
img_dir = join(out_dir, 'images') | |
os.makedirs(img_dir, exist_ok=True) | |
annot_dir = join(out_dir, 'annotations') | |
os.makedirs(annot_dir, exist_ok=True) | |
for subj in test_subjects: | |
subj_path = join(data_root, 'test', f'TS{subj}') | |
num_frames = test_frame_nums[subj] | |
# load annotations | |
annot_file = os.path.join(subj_path, 'annot_data.mat') | |
with h5py.File(annot_file, 'r') as fin: | |
annot2 = np.array(fin['annot2']).reshape((-1, 17, 2)) | |
annot3 = np.array(fin['annot3']).reshape((-1, 17, 3)) | |
valid = np.array(fin['valid_frame']).reshape(-1) | |
# manually estimate camera intrinsics | |
fx, cx = np.linalg.lstsq( | |
annot3[:, :, [0, 2]].reshape((-1, 2)), | |
(annot2[:, :, 0] * annot3[:, :, 2]).reshape(-1, 1), | |
rcond=None)[0].flatten() | |
fy, cy = np.linalg.lstsq( | |
annot3[:, :, [1, 2]].reshape((-1, 2)), | |
(annot2[:, :, 1] * annot3[:, :, 2]).reshape(-1, 1), | |
rcond=None)[0].flatten() | |
if subj <= 4: | |
w, h = 2048, 2048 | |
else: | |
w, h = 1920, 1080 | |
cameras[f'TS{subj}'] = dict( | |
c=np.array([[cx], [cy]]), | |
f=np.array([[fx], [fy]]), | |
w=w, | |
h=h, | |
name=f'test_cam_{subj}') | |
# get annotations | |
if valid_only: | |
valid_frames = np.nonzero(valid)[0] | |
else: | |
valid_frames = np.arange(num_frames) | |
joints_2d = annot2[valid_frames, :, :] | |
joints_3d = annot3[valid_frames, :, :] * 0.001 | |
centers, scales, joints_2d, joints_3d = get_annotations( | |
joints_2d, joints_3d) | |
_centers.append(centers) | |
_scales.append(scales) | |
_joints_2d.append(joints_2d) | |
_joints_3d.append(joints_3d) | |
# copy and rename images | |
for i in valid_frames: | |
imgname = f'TS{subj}_{i+1:06d}.jpg' | |
shutil.copyfile( | |
join(subj_path, 'imageSequence', f'img_{i+1:06d}.jpg'), | |
join(img_dir, imgname)) | |
_imgnames.append(imgname) | |
_imgnames = np.array(_imgnames) | |
_centers = np.concatenate(_centers) | |
_scales = np.concatenate(_scales) | |
_joints_2d = np.concatenate(_joints_2d) | |
_joints_3d = np.concatenate(_joints_3d) | |
if valid_only: | |
out_file = join(annot_dir, 'mpi_inf_3dhp_test_valid.npz') | |
else: | |
out_file = join(annot_dir, 'mpi_inf_3dhp_test_all.npz') | |
np.savez( | |
out_file, | |
imgname=_imgnames, | |
center=_centers, | |
scale=_scales, | |
part=_joints_2d, | |
S=_joints_3d) | |
print(f'Create annotation file for testset: {out_file}. ' | |
f'{len(_imgnames)} samples in total.') | |
out_file = join(annot_dir, 'cameras_test.pkl') | |
with open(out_file, 'wb') as fout: | |
pickle.dump(cameras, fout) | |
print(f'Create camera file for testset: {out_file}.') | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('data_root', type=str, help='data root') | |
parser.add_argument( | |
'out_dir', type=str, help='directory to save annotation files.') | |
args = parser.parse_args() | |
data_root = args.data_root | |
out_dir = args.out_dir | |
load_trainset(data_root, out_dir) | |
load_testset(data_root, out_dir, valid_only=True) | |