forked from vsergeev/luaradio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtop_spec.lua
162 lines (130 loc) · 5.18 KB
/
top_spec.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
local ffi = require('ffi')
local radio = require('radio')
local jigs = require('tests.jigs')
local buffer = require('tests.buffer')
local test_vectors = dofile('tests/top_vectors.gen.lua')
ffi.cdef[[
int pipe(int fildes[2]);
]]
describe("top level test", function ()
for _, multiprocess in pairs({true, false}) do
it("example " .. (multiprocess and "multiprocess" or "singleprocess"), function ()
--[[
[ Source ] -- [ Mul. Conj. ] -- [ LPF ] -- [ Freq. Discrim. ] -- [ Decimator ] -- [ Sink ]
|
|
[ Source ]
--]]
-- Prepare our source and sink file descriptors
local src1_fd = buffer.open(test_vectors.SRC1_TEST_VECTOR)
local src2_fd = buffer.open(test_vectors.SRC2_TEST_VECTOR)
local snk_fd = buffer.open()
-- Build the pipeline
local top = radio.CompositeBlock()
local src1 = radio.IQFileSource(src1_fd, 'f32le', 1000000)
local src2 = radio.IQFileSource(src2_fd, 'f32le', 1000000)
local b1 = radio.MultiplyConjugateBlock()
local b2 = radio.LowpassFilterBlock(16, 100e3)
local b3 = radio.FrequencyDiscriminatorBlock(5.0)
local b4 = radio.DecimatorBlock(25, {num_taps = 16})
local snk = radio.RawFileSink(snk_fd)
top:connect(src1, 'out', b1, 'in1')
top:connect(src2, 'out', b1, 'in2')
top:connect(b1, b2, b3, b4, snk)
top:run(multiprocess)
-- Rewind the sink buffer
buffer.rewind(snk_fd)
-- Read the sink buffer
local buf = buffer.read(snk_fd, #test_vectors.SNK_TEST_VECTOR*2)
assert.is.equal(#test_vectors.SNK_TEST_VECTOR, #buf)
-- Deserialize actual and expected test vectors
local actual = radio.types.Float32.deserialize(buf, #test_vectors.SNK_TEST_VECTOR/4)
local expected = radio.types.Float32.deserialize(test_vectors.SNK_TEST_VECTOR, #test_vectors.SNK_TEST_VECTOR/4)
jigs.assert_vector_equal(expected, actual, 1e-6)
end)
end
it("flow graph wait()", function ()
-- Create a pipe
local pipe_fds = ffi.new("int[2]")
assert(ffi.C.pipe(pipe_fds) == 0)
-- Build and start flow graph
local top = radio.CompositeBlock():connect(
radio.RawFileSource(pipe_fds[0], radio.types.Byte, 1),
radio.DelayBlock(10),
radio.PrintSink()
):start()
-- Close write end of pipe
assert(ffi.C.close(pipe_fds[1]) == 0)
-- Wait for flow graph to finish
top:wait()
-- Close read end of pipe
assert(ffi.C.close(pipe_fds[0]) == 0)
end)
it("flow graph stop()", function ()
-- Create a pipe
local pipe_fds = ffi.new("int[2]")
assert(ffi.C.pipe(pipe_fds) == 0)
-- Build and start flow graph
local top = radio.CompositeBlock():connect(
radio.RawFileSource(pipe_fds[0], radio.types.Byte, 1),
radio.DelayBlock(10),
radio.PrintSink()
):start()
-- Stop flow graph
top:stop()
-- Close pipe
assert(ffi.C.close(pipe_fds[1]) == 0)
assert(ffi.C.close(pipe_fds[0]) == 0)
end)
it("flow graph status()", function ()
-- Create a pipe
local pipe_fds = ffi.new("int[2]")
assert(ffi.C.pipe(pipe_fds) == 0)
-- Build and start flow graph
local top = radio.CompositeBlock():connect(
radio.RawFileSource(pipe_fds[0], radio.types.Byte, 1),
radio.DelayBlock(10),
radio.PrintSink()
):start()
-- Check running status
assert.is.equal(top:status().running, true)
-- Close write end of pipe
assert(ffi.C.close(pipe_fds[1]) == 0)
-- Wait for running status to trip
local tic = os.time()
while true do
if not top:status().running then
break
end
-- Timeout after 5 seconds
assert.is_true((os.time() - tic) < 5)
end
-- Close read end of pipe
assert(ffi.C.close(pipe_fds[0]) == 0)
end)
it("flow graph data integrity", function ()
local SRC_SIZE = 4*1048576
-- Create source and sink buffers
local src_fd = buffer.open()
local snk_fd = buffer.open()
-- Write random bytes to source
local f_random = io.open("/dev/urandom", "rb")
buffer.write(src_fd, f_random:read(SRC_SIZE))
f_random:close()
buffer.rewind(src_fd)
-- Create and run the pipeline
local top = radio.CompositeBlock():connect(
radio.RawFileSource(src_fd, radio.types.Byte, 1),
radio.RawFileSink(snk_fd)
):run()
-- Rewind buffers
buffer.rewind(src_fd)
buffer.rewind(snk_fd)
-- Compare buffers
local a = buffer.read(src_fd, SRC_SIZE*2)
local b = buffer.read(snk_fd, SRC_SIZE*2)
assert.is.equal(SRC_SIZE, #a)
assert.is.equal(#a, #b)
assert.is_true(a == b)
end)
end)