-
Notifications
You must be signed in to change notification settings - Fork 7
/
flow_io.py
executable file
·150 lines (116 loc) · 3.87 KB
/
flow_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#! /usr/bin/env python2
"""
I/O script to save and load the data coming with the MPI-Sintel low-level
computer vision benchmark.
For more details about the benchmark, please visit www.mpi-sintel.de
CHANGELOG:
v1.0 (2015/02/03): First release
Copyright (c) 2015 Jonas Wulff
Max Planck Institute for Intelligent Systems, Tuebingen, Germany
"""
# Requirements: Numpy as PIL/Pillow
import numpy as np
try:
import png
has_png = True
except:
has_png = False
png=None
# Check for endianness, based on Daniel Scharstein's optical flow code.
# Using little-endian architecture, these two should be equal.
TAG_FLOAT = 202021.25
TAG_CHAR = 'PIEH'.encode()
def flow_read(filename, return_validity=False):
""" Read optical flow from file, return (U,V) tuple.
Original code by Deqing Sun, adapted from Daniel Scharstein.
"""
f = open(filename,'rb')
check = np.fromfile(f,dtype=np.float32,count=1)[0]
assert check == TAG_FLOAT, ' flow_read:: Wrong tag in flow file (should be: {0}, is: {1}). Big-endian machine? '.format(TAG_FLOAT,check)
width = np.fromfile(f,dtype=np.int32,count=1)[0]
height = np.fromfile(f,dtype=np.int32,count=1)[0]
size = width*height
assert width > 0 and height > 0 and size > 1 and size < 100000000, ' flow_read:: Wrong input size (width = {0}, height = {1}).'.format(width,height)
tmp = np.fromfile(f,dtype=np.float32,count=-1).reshape((height,width*2))
u = tmp[:,np.arange(width)*2]
v = tmp[:,np.arange(width)*2 + 1]
if return_validity:
valid = u<1e19
u[valid==0] = 0
v[valid==0] = 0
return u,v,valid
else:
return u,v
def flow_write(filename,uv,v=None):
""" Write optical flow to file.
If v is None, uv is assumed to contain both u and v channels,
stacked in depth.
Original code by Deqing Sun, adapted from Daniel Scharstein.
"""
nBands = 2
if v is None:
uv_ = np.array(uv)
assert(uv_.ndim==3)
if uv_.shape[0] == 2:
u = uv_[0,:,:]
v = uv_[1,:,:]
elif uv_.shape[2] == 2:
u = uv_[:,:,0]
v = uv_[:,:,1]
else:
raise UVError('Wrong format for flow input')
else:
u = uv
assert(u.shape == v.shape)
height,width = u.shape
f = open(filename,'wb')
# write the header
f.write(TAG_CHAR)
np.array(width).astype(np.int32).tofile(f)
np.array(height).astype(np.int32).tofile(f)
# arrange into matrix form
tmp = np.zeros((height, width*nBands))
tmp[:,np.arange(width)*2] = u
tmp[:,np.arange(width)*2 + 1] = v
tmp.astype(np.float32).tofile(f)
f.close()
def flow_read_png(fpath):
"""
Read KITTI optical flow, returns u,v,valid mask
"""
if not has_png:
print('Error. Please install the PyPNG library')
return
R = png.Reader(fpath)
width,height,data,_ = R.asDirect()
# This only worked with python2.
#I = np.array(map(lambda x:x,data)).reshape((height,width,3))
I = np.array([x for x in data]).reshape((height,width,3))
u_ = I[:,:,0]
v_ = I[:,:,1]
valid = I[:,:,2]
u = (u_.astype('float64')-2**15)/64.0
v = (v_.astype('float64')-2**15)/64.0
return u,v,valid
def flow_write_png(fpath,u,v,valid=None):
"""
Write KITTI optical flow.
"""
if not has_png:
print('Error. Please install the PyPNG library')
return
if valid==None:
valid_ = np.ones(u.shape,dtype='uint16')
else:
valid_ = valid.astype('uint16')
u = u.astype('float64')
v = v.astype('float64')
u_ = ((u*64.0)+2**15).astype('uint16')
v_ = ((v*64.0)+2**15).astype('uint16')
I = np.dstack((u_,v_,valid_))
W = png.Writer(width=u.shape[1],
height=u.shape[0],
bitdepth=16,
planes=3)
with open(fpath,'wb') as fil:
W.write(fil,I.reshape((-1,3*u.shape[1])))