-
Notifications
You must be signed in to change notification settings - Fork 0
/
dataLoader.py
205 lines (175 loc) · 8.8 KB
/
dataLoader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import copy
from torch.utils import data
import torch as t
from torch import nn
import open3d as o3d
import numpy as np
from numpy import random as rd
from colors import COLOR_MAP
import os
"""
train_data_dir:
pcd1.ply
pcd2.ply
......
valid_data_dir:
pcd1.ply
pcd2.ply
......
"""
class MySet(data.Dataset):
def __init__(self, data_dir, voxel_size, R_range, t_range, select_point_count, noise_strength, feature_distance_tresh):
"""
:param data_dir: ply data dir
:param voxel_size: voxel size for downsample one point cloud
:param R_range: value range of item of rotation matrix
:param t_range: value range of item of translate vector
:param select_point_count: point count of one correspondence
:param noise_strenght: float, specify the noise strenght while add gaussian noise to source pcd
:param feature_distance_tresh: feature distance threshold, use for label inlier or outlier
"""
self.voxel_size = voxel_size
self.R_range = R_range
self.t_range = t_range
self.select_point_count = select_point_count
self.pcd_pths = [os.path.join(data_dir, i) for i in os.listdir(data_dir)]
self.noise_strength = noise_strength
self.feature_distance_thresh = feature_distance_tresh
def __getitem__(self, index):
pcd_pth = self.pcd_pths[index]
match_indices = np.array([])
while match_indices.shape[0] == 0:
pcd = self.load_one_pcd(pcd_pth)
source = copy.deepcopy(pcd)
target, R, t_vec = self.random_transform(pcd) # R * target + t_vec
source = self.add_gauss_noise_to_pcd(source) # add noise to source
# o3d.io.write_point_cloud("source.ply", source)
# o3d.io.write_point_cloud("target.ply", target)
target_down, target_down_fpfh = self.preprocess_point_cloud(target)
source_down, source_down_fpfh = self.preprocess_point_cloud(source)
fpfh_regis_result = self.execute_global_registration(source_down, target_down, source_down_fpfh, target_down_fpfh)
match_indices = np.asarray(fpfh_regis_result.correspondence_set)
target_down_array = np.asarray(target_down.points)
source_down_array = np.asarray(source_down.points)
target_down_fpfh_array = np.asarray(target_down_fpfh.data).transpose()
source_down_fpfh_array = np.asarray(source_down_fpfh.data).transpose()
target_select, source_select, target_fpfh, source_fpfh = self.select_point_feature(target_down_array, source_down_array, target_down_fpfh_array, source_down_fpfh_array, match_indices)
feature_dist = self.calc_feature_dist(target_fpfh, source_fpfh)
class_label = feature_dist <= self.feature_distance_thresh
d = t.tensor(np.concatenate([source_select, target_select], axis=1)).type(t.FloatTensor)
class_label = t.tensor(class_label).type(t.FloatTensor)
return d, class_label, feature_dist, R, t_vec.view((3,))
def __len__(self):
return len(self.pcd_pths)
def select_point_feature(self, target_down_array, source_down_array, target_down_fpfh_array, source_down_fpfh_array, match_indices):
select_indices = match_indices[rd.choice(np.arange(match_indices.shape[0]), self.select_point_count, replace=True), :]
target_select = target_down_array[select_indices[:, 1], :]
source_select = source_down_array[select_indices[:, 0], :]
target_fpfh = target_down_fpfh_array[select_indices[:, 1], :]
source_fpfh = source_down_fpfh_array[select_indices[:, 0], :]
return target_select, source_select, target_fpfh, source_fpfh
def calc_feature_dist(self, target_fpfh, source_fpfh):
feature_dist = np.sqrt(np.sum((target_fpfh - source_fpfh) ** 2, axis=1))
return feature_dist
def add_gauss_noise_to_pcd(self, pcd):
point = np.asarray(pcd.points)
signal = point
SNR = 1 / self.noise_strength * 10 # value为指定噪声强度
noise = np.random.randn(signal.shape[0], signal.shape[1])
noise = noise - np.mean(noise)
signal_power = np.linalg.norm(signal) ** 2 / signal.size
noise_variance = signal_power / np.power(10, (SNR / 10))
noise = (np.sqrt(noise_variance) / np.std(noise)) * noise
signal_noise = noise + signal
pcd_noise = o3d.geometry.PointCloud()
pcd_noise.points = o3d.utility.Vector3dVector(signal_noise)
return pcd_noise
def load_one_pcd(self, pcd_pth):
pcd = o3d.io.read_point_cloud(pcd_pth)
return pcd
def preprocess_point_cloud(self, pcd):
# print(":: Downsample with a voxel size %.3f." % self.voxel_size)
pcd_down = pcd.voxel_down_sample(self.voxel_size)
radius_normal = self.voxel_size * 2
# print(":: Estimate normal with search radius %.3f." % radius_normal)
pcd_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))
radius_feature = self.voxel_size * 5
# print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd_down,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
return pcd_down, pcd_fpfh
def pcd2tensor(self, pcd):
points = np.asarray(pcd.points)
pcd_tensor = t.tensor(points).type(t.FloatTensor)
return pcd_tensor
def show_pcd(self, pcd_list):
"""
:param pcd_list: list of pcd
:return:
"""
for i, pcd in enumerate(pcd_list):
pcd.paint_uniform_color(COLOR_MAP[i])
o3d.visualization.draw_geometries(pcd_list, "pcd")
def random_generate_R_t(self):
"""
random generate rotation matrix and translate vector
:return:
"""
random_lie_param = rd.uniform(self.R_range[0], self.R_range[1], (3, 1))
R = t.tensor(o3d.geometry.get_rotation_matrix_from_axis_angle(random_lie_param)).type(t.FloatTensor) # rotation matrix
t_vec = t.tensor(rd.uniform(self.t_range[0], self.t_range[1], (3, 1))).type(t.FloatTensor)
return R, t_vec
def rotate_pcd(self, pcd, R):
"""
:param pcd: point cloud data
:param R: rotation matrix, shape is (3, 3)
:return:
"""
pcd_after_rotate = pcd.rotate(R, center=(0, 0, 0))
return pcd_after_rotate
def translate_pcd(self, pcd, t):
"""
:param pcd: point cloud data
:param t: translate vector, shape is (3,)
:return:
"""
pcd_after_translate = pcd.translate(t)
return pcd_after_translate
def random_transform(self, pcd):
R, t_vec = self.random_generate_R_t()
pcd_after_rotate = self.rotate_pcd(pcd, R)
pcd_after_translate = self.translate_pcd(pcd_after_rotate, t_vec)
return pcd_after_translate, R, t_vec
def execute_global_registration(self, source_down, target_down, source_fpfh, target_fpfh):
distance_threshold = self.voxel_size * 1.5
# print(":: RANSAC registration on downsampled point clouds.")
# print(" Since the downsampling voxel size is %.3f," % self.voxel_size)
# print(" we use a liberal distance threshold %.3f." % distance_threshold)
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_down, target_down, source_fpfh, target_fpfh, True, distance_threshold,
o3d.pipelines.registration.TransformationEstimationPointToPoint(False), 3, [
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
distance_threshold)
],
o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
return result
def find_feature_dist_thresh(pcd_dir, voxel_size, R_range, t_range, select_point_count, noise_strength):
s = MySet(pcd_dir, voxel_size, R_range, t_range, select_point_count, noise_strength, 0.1)
dist_med_lst = []
for _, _, feature_dist, _, _ in s:
dist_med = np.median(feature_dist)
dist_med_lst.append(dist_med)
return np.mean(dist_med_lst)
def make_loader(data_dir, voxel_size, R_range, t_range, select_point_count, noise_strength, feature_distance_tresh, batch_size, num_workers):
loader = iter(data.DataLoader(MySet(data_dir, voxel_size, R_range, t_range, select_point_count, noise_strength, feature_distance_tresh), batch_size=batch_size, shuffle=True, drop_last=True, num_workers=num_workers))
return loader
if __name__ == "__main__":
pcd_dir = r"F:\data\shapenet_data\valid"
voxel_size = 0.01
R_range = [-0.2, 0.2]
t_range = [-0.1, 0.1]
s = MySet(pcd_dir, voxel_size, R_range, t_range, 3000, 0.12, 45)
s[500]