-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
329 lines (273 loc) · 11.6 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import os
import os.path as osp
import shutil
import warnings
import cv2
import mmcv
import numpy as np
import torch
from scipy.optimize import linear_sum_assignment
from pyskl.apis import inference_recognizer, init_recognizer
try:
from mmdet.apis import inference_detector, init_detector
except (ImportError, ModuleNotFoundError):
def inference_detector(*args, **kwargs):
pass
def init_detector(*args, **kwargs):
pass
warnings.warn(
'Failed to import `inference_detector` and `init_detector` from `mmdet.apis`. '
'Make sure you can successfully import these if you want to use related features. '
)
try:
from mmpose.apis import init_pose_model, inference_top_down_pose_model, vis_pose_result
except (ImportError, ModuleNotFoundError):
def init_pose_model(*args, **kwargs):
pass
def inference_top_down_pose_model(*args, **kwargs):
pass
def vis_pose_result(*args, **kwargs):
pass
warnings.warn(
'Failed to import `init_pose_model`, `inference_top_down_pose_model`, `vis_pose_result` from '
'`mmpose.apis`. Make sure you can successfully import these if you want to use related features. '
)
try:
import moviepy.editor as mpy
except ImportError:
raise ImportError('Please install moviepy to enable output file')
FONTFACE = cv2.FONT_HERSHEY_DUPLEX
FONTSCALE = 0.75
FONTCOLOR = (255, 255, 255) # BGR, white
THICKNESS = 1
LINETYPE = 1
def parse_args():
parser = argparse.ArgumentParser(description='PoseC3D demo')
parser.add_argument('video', help='video file/url')
parser.add_argument('out_filename', help='output filename')
parser.add_argument(
'--config',
default='configs/posec3d/slowonly_r50_ntu120_xsub/joint.py',
help='skeleton action recognition config file path')
parser.add_argument(
'--checkpoint',
default=('https://download.openmmlab.com/mmaction/pyskl/ckpt/'
'posec3d/slowonly_r50_ntu120_xsub/joint.pth'),
help='skeleton action recognition checkpoint file/url')
parser.add_argument(
'--det-config',
default='demo/faster_rcnn_r50_fpn_2x_coco.py',
help='human detection config file path (from mmdet)')
parser.add_argument(
'--det-checkpoint',
default=('http://download.openmmlab.com/mmdetection/v2.0/faster_rcnn/'
'faster_rcnn_r50_fpn_2x_coco/faster_rcnn_r50_fpn_2x_coco_'
'bbox_mAP-0.384_20200504_210434-a5d8aa15.pth'),
help='human detection checkpoint file/url')
parser.add_argument(
'--pose-config',
default='demo/hrnet_w32_coco_256x192.py',
help='human pose estimation config file path (from mmpose)')
parser.add_argument(
'--pose-checkpoint',
default=('https://download.openmmlab.com/mmpose/top_down/hrnet/'
'hrnet_w32_coco_256x192-c78dce93_20200708.pth'),
help='human pose estimation checkpoint file/url')
parser.add_argument(
'--det-score-thr',
type=float,
default=0.9,
help='the threshold of human detection score')
parser.add_argument(
'--label-map',
default='tools/data/label_map/nturgbd_120.txt',
help='label map file')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--short-side',
type=int,
default=480,
help='specify the short-side length of the image')
args = parser.parse_args()
return args
def frame_extraction(video_path, short_side):
"""Extract frames given video_path.
Args:
video_path (str): The video_path.
"""
# Load the video, extract frames into ./tmp/video_name
target_dir = osp.join('./tmp', osp.basename(osp.splitext(video_path)[0]))
os.makedirs(target_dir, exist_ok=True)
# Should be able to handle videos up to several hours
frame_tmpl = osp.join(target_dir, 'img_{:06d}.jpg')
vid = cv2.VideoCapture(video_path)
frames = []
frame_paths = []
flag, frame = vid.read()
cnt = 0
new_h, new_w = None, None
while flag:
if new_h is None:
h, w, _ = frame.shape
new_w, new_h = mmcv.rescale_size((w, h), (short_side, np.Inf))
frame = mmcv.imresize(frame, (new_w, new_h))
frames.append(frame)
frame_path = frame_tmpl.format(cnt + 1)
frame_paths.append(frame_path)
cv2.imwrite(frame_path, frame)
cnt += 1
flag, frame = vid.read()
return frame_paths, frames
def detection_inference(args, frame_paths):
"""Detect human boxes given frame paths.
Args:
args (argparse.Namespace): The arguments.
frame_paths (list[str]): The paths of frames to do detection inference.
Returns:
list[np.ndarray]: The human detection results.
"""
model = init_detector(args.det_config, args.det_checkpoint, args.device)
assert model is not None, ('Failed to build the detection model. Check if you have installed mmcv-full properly. '
'Note that you should first install mmcv-full successfully, then install mmdet, mmpose. ')
assert model.CLASSES[0] == 'person', 'We require you to use a detector trained on COCO'
results = []
print('Performing Human Detection for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for frame_path in frame_paths:
result = inference_detector(model, frame_path)
# We only keep human detections with score larger than det_score_thr
result = result[0][result[0][:, 4] >= args.det_score_thr]
results.append(result)
prog_bar.update()
return results
def pose_inference(args, frame_paths, det_results):
model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
ret = []
print('Performing Human Pose Estimation for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for f, d in zip(frame_paths, det_results):
# Align input format
d = [dict(bbox=x) for x in list(d)]
pose = inference_top_down_pose_model(model, f, d, format='xyxy')[0]
ret.append(pose)
prog_bar.update()
return ret
def dist_ske(ske1, ske2):
dist = np.linalg.norm(ske1[:, :2] - ske2[:, :2], axis=1) * 2
diff = np.abs(ske1[:, 2] - ske2[:, 2])
return np.sum(np.maximum(dist, diff))
def pose_tracking(pose_results, max_tracks=2, thre=30):
tracks, num_tracks = [], 0
num_joints = None
for idx, poses in enumerate(pose_results):
if len(poses) == 0:
continue
if num_joints is None:
num_joints = poses[0].shape[0]
track_proposals = [t for t in tracks if t['data'][-1][0] > idx - thre]
n, m = len(track_proposals), len(poses)
scores = np.zeros((n, m))
for i in range(n):
for j in range(m):
scores[i][j] = dist_ske(track_proposals[i]['data'][-1][1], poses[j])
row, col = linear_sum_assignment(scores)
for r, c in zip(row, col):
track_proposals[r]['data'].append((idx, poses[c]))
if m > n:
for j in range(m):
if j not in col:
num_tracks += 1
new_track = dict(data=[])
new_track['track_id'] = num_tracks
new_track['data'] = [(idx, poses[j])]
tracks.append(new_track)
tracks.sort(key=lambda x: -len(x['data']))
result = np.zeros((max_tracks, len(pose_results), num_joints, 3), dtype=np.float16)
for i, track in enumerate(tracks[:max_tracks]):
for item in track['data']:
idx, pose = item
result[i, idx] = pose
return result[..., :2], result[..., 2]
def main():
args = parse_args()
frame_paths, original_frames = frame_extraction(args.video,
args.short_side)
num_frame = len(frame_paths)
h, w, _ = original_frames[0].shape
config = mmcv.Config.fromfile(args.config)
config.data.test.pipeline = [x for x in config.data.test.pipeline if x['type'] != 'DecompressPose']
# Are we using GCN for Infernece?
GCN_flag = 'GCN' in config.model.type
GCN_nperson = None
if GCN_flag:
format_op = [op for op in config.data.test.pipeline if op['type'] == 'FormatGCNInput'][0]
GCN_nperson = format_op['num_person']
model = init_recognizer(config, args.checkpoint, args.device)
# Load label_map
label_map = [x.strip() for x in open(args.label_map).readlines()]
# Get Human detection results
det_results = detection_inference(args, frame_paths)
torch.cuda.empty_cache()
pose_results = pose_inference(args, frame_paths, det_results)
torch.cuda.empty_cache()
fake_anno = dict(
frame_dir='',
label=-1,
img_shape=(h, w),
original_shape=(h, w),
start_index=0,
modality='Pose',
total_frames=num_frame)
if GCN_flag:
# We will keep at most `GCN_nperson` persons per frame.
tracking_inputs = [[pose['keypoints'] for pose in poses] for poses in pose_results]
keypoint, keypoint_score = pose_tracking(tracking_inputs, max_tracks=GCN_nperson)
fake_anno['keypoint'] = keypoint
fake_anno['keypoint_score'] = keypoint_score
else:
num_person = max([len(x) for x in pose_results])
# Current PoseC3D models are trained on COCO-keypoints (17 keypoints)
num_keypoint = 17
keypoint = np.zeros((num_person, num_frame, num_keypoint, 2),
dtype=np.float16)
keypoint_score = np.zeros((num_person, num_frame, num_keypoint),
dtype=np.float16)
for i, poses in enumerate(pose_results):
for j, pose in enumerate(poses):
pose = pose['keypoints']
keypoint[j, i] = pose[:, :2]
keypoint_score[j, i] = pose[:, 2]
fake_anno['keypoint'] = keypoint
fake_anno['keypoint_score'] = keypoint_score
results = inference_recognizer(model, fake_anno)
action_label = label_map[results[0][0]]
pose_model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
vis_frames = [
vis_pose_result(pose_model, frame_paths[i], pose_results[i])
for i in range(num_frame)
]
for frame in vis_frames:
cv2.putText(frame, action_label, (10, 30), FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
print(vis_frames[0].shape)
print(vis_frames[1].shape)
frameSize = (500, 500)
out = cv2.VideoWriter('output_video.avi',cv2.VideoWriter_fourcc(*'DIVX'), 24, frameSize)
for i in range(len(vis_frames)):
fileName = 'output/' + str(i) + '.jpg'
cv2.imwrite(fileName, vis_frames[i])
for i in range(len(vis_frames)):
img = cv2.resize(vis_frames[i], frameSize, interpolation = cv2.INTER_AREA)
out.write(img)
out.release()
# vid = mpy.ImageSequenceClip([x[:, :, ::-1] for x in vis_frames], fps=24)
# vid.write_videofile(args.out_filename, remove_temp=True)
# tmp_frame_dir = osp.dirname(frame_paths[0])
# shutil.rmtree(tmp_frame_dir)
if __name__ == '__main__':
main()