Skip to content

Commit

Permalink
DynMM Test: separate MVAU and DynMVAU testing
Browse files Browse the repository at this point in the history
  • Loading branch information
aziz bahri committed Nov 29, 2024
1 parent 9589446 commit 129fe70
Showing 1 changed file with 41 additions and 116 deletions.
157 changes: 41 additions & 116 deletions tests/fpgadataflow/test_fpgadataflow_mvau.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,9 @@
from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers


def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=None, inp_A=None, mvau="Weight"):
"""
Create a graph with a single MVAU Node.
TODO: AB: Currently reusing the old implementation of MVAU, update to use the new implementation.
assume weight input is input matrix B and derive matrix shape using weight matrix shape.
for matrix of size [MxN] * [NxK],
MAT A dimension variables:
M = N_VECTORS
N = MH
MAT B dimensions:
N = MH
K = MW
"""
mw = W.shape[0] # TODO: AB: Are these swapped for DynMM?
mh = W.shape[1] # TODO: AB: Are these swapped for DynMM?
n_vectors = inp_A.shape[0] if inp_A is not None else 1
def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=None):
mw = W.shape[0]
mh = W.shape[1]
assert mh % pe == 0
assert mw % simd == 0

Expand All @@ -107,13 +91,6 @@ def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=Non
binary_xnor_mode = 0

inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, [1, mw])
inp_A = helper.make_tensor_value_info("inp_A", TensorProto.FLOAT, [1, mw])
inp_B = helper.make_tensor_value_info("inp_B", TensorProto.FLOAT, [mw, mh])
if mvau == "dynamic":
graph_inp_list = [inp_A, inp_B]
else:
graph_inp_list = [inp]

outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, [1, mh])
if T is not None:
no_act = 0
Expand All @@ -122,11 +99,6 @@ def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=Non
actval = 0
else:
actval = odt.min()
elif mvau == "dynamic":
# dynamic matmul
node_inp_list = ["inp_A", "inp_B"]
actval = 0
no_act = 1
else:
# no thresholds
node_inp_list = ["inp", "weights"]
Expand All @@ -142,7 +114,6 @@ def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=Non
MH=mh,
SIMD=simd,
PE=pe,
N_VECTORS=n_vectors, # Height of Input Matrix A
inputDataType=export_idt.name,
weightDataType=export_wdt.name,
outputDataType=odt.name,
Expand All @@ -151,32 +122,23 @@ def make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T=None, tdt=Non
noActivation=no_act,
)
graph = helper.make_graph(
nodes=[FCLayer_node], name="fclayer_graph", inputs=graph_inp_list, outputs=[outp]
nodes=[FCLayer_node], name="fclayer_graph", inputs=[inp], outputs=[outp]
)

model = qonnx_make_model(graph, producer_name="fclayer-model")
model = ModelWrapper(model)

# Set IO Datatypes and Initializers
if wdt is None:
model.set_tensor_datatype("inp_A", idt)
model.set_tensor_datatype("inp_B", idt)
else:
model.set_tensor_datatype("inp", idt)
model.set_tensor_datatype("weights", wdt)
# Set Initializers
if binary_xnor_mode:
# convert bipolar to binary
model.set_initializer("weights", (W + 1) / 2)
else:
model.set_initializer("weights", W)

if T is not None:
model.set_tensor_datatype("thresh", tdt)
model.set_initializer("thresh", T)

model.set_tensor_datatype("inp", idt)
model.set_tensor_datatype("outp", odt)

model.set_tensor_datatype("weights", wdt)
if binary_xnor_mode:
# convert bipolar to binary
model.set_initializer("weights", (W + 1) / 2)
else:
model.set_initializer("weights", W)
if T is not None:
model.set_tensor_datatype("thresh", tdt)
model.set_initializer("thresh", T)
return model


Expand All @@ -198,24 +160,17 @@ def make_single_matmul_modelwrapper(ifm, ofm, idt, wdt, W):
return model


def prepare_inputs(input_tensor, idt, wdt, inp_name="inp", inp_B=None):
def prepare_inputs(input_tensor, idt, wdt, inp_name="inp"):
if wdt == DataType["BIPOLAR"] and idt == DataType["BIPOLAR"]:
# convert bipolar to binary
return {inp_name: (input_tensor + 1) / 2}
elif wdt is None:
# dynamic matmul
if inp_B is None:
raise Exception("inp_B not provided for dynamic matmul")
else:
return {"inp_A": input_tensor, "inp_B": inp_B}
else:
return {inp_name: input_tensor}

# MMVU Type
@pytest.mark.parametrize("mvau", ["dynamic", "Weight"])

# activation: None or DataType
@pytest.mark.parametrize("act", [None, DataType["BIPOLAR"], DataType["INT4"]])
# weight datatype, if None then this is a dynamic matmul with two inputs
# weight datatype
@pytest.mark.parametrize("wdt", [DataType["BIPOLAR"], DataType["INT4"]])
# input datatype
@pytest.mark.parametrize("idt", [DataType["BIPOLAR"], DataType["INT4"]])
Expand All @@ -230,11 +185,7 @@ def prepare_inputs(input_tensor, idt, wdt, inp_name="inp", inp_B=None):
@pytest.mark.fpgadataflow
@pytest.mark.slow
@pytest.mark.vivado
def test_fpgadataflow_mvau_hwop(idt, wdt, act, nf, sf, mw, mh,mvau):
"""
Test the HWOP execution of the MVAU operation.
For Dynamic matmul with two input matrices, set wdt to None and provide inp_B as the second input matrix.
"""
def test_fpgadataflow_mvau_hwop(idt, wdt, act, nf, sf, mw, mh):
if nf == -1:
nf = mh
if sf == -1:
Expand All @@ -243,16 +194,10 @@ def test_fpgadataflow_mvau_hwop(idt, wdt, act, nf, sf, mw, mh,mvau):
simd = mw // sf
assert mh % pe == 0
assert mw % sf == 0

# generate inputs
inp_A = gen_finn_dt_tensor(idt, (1, mw))
if mvau == "dynamic":
# dynamic matmul second input
inp_B = gen_finn_dt_tensor(idt, (mw, mh))
else:
# This is the weight matrix
inp_B = gen_finn_dt_tensor(wdt, (mw, mh))

# generate weights
W = gen_finn_dt_tensor(wdt, (mw, mh))
# generate input data
x = gen_finn_dt_tensor(idt, (1, mw))
if act is None:
# no activation, produce accumulators
T = None
Expand All @@ -262,8 +207,6 @@ def test_fpgadataflow_mvau_hwop(idt, wdt, act, nf, sf, mw, mh,mvau):
else:
odt = DataType["INT32"]
else:
if mvau == "dynamic":
pytest.skip("Dynamic matmul not supported")
odt = act
(min, max) = calculate_signed_dot_prod_range(idt, wdt, mw)
n_steps = act.get_num_possible_values() - 1
Expand All @@ -278,14 +221,14 @@ def test_fpgadataflow_mvau_hwop(idt, wdt, act, nf, sf, mw, mh,mvau):
assert (T >= 0).all()
else:
tdt = DataType["INT32"]
model = make_single_fclayer_modelwrapper(inp_B, pe, simd, wdt, idt, odt, T, tdt)
model = make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T, tdt)
# prepare input data
input_dict = prepare_inputs(inp_A, idt, wdt, inp_B=inp_B)
input_dict = prepare_inputs(x, idt, wdt)
if wdt == DataType["BIPOLAR"] and idt == DataType["BIPOLAR"]:
# convert inputs to binary and use xnorpopcountmatmul
y = xp.xnorpopcountmatmul((inp_A + 1) / 2, (inp_B + 1) / 2)
y = xp.xnorpopcountmatmul((x + 1) / 2, (W + 1) / 2)
else:
y = np.matmul(inp_A, inp_B)
y = np.matmul(x, W)
if T is not None:
# y = multithreshold(y, T)
if act == DataType["BIPOLAR"]:
Expand Down Expand Up @@ -501,14 +444,12 @@ def test_fpgadataflow_mvau_rtlsim(mem_mode, idt, wdt, act, nf, sf, mw, mh):
assert exp_cycles != 0


# MMVU Type
@pytest.mark.parametrize("mvau", ["dynamic", "Weight"])
# mem_mode: internal_embedded or internal_decoupled
@pytest.mark.parametrize("mem_mode", ["internal_decoupled"])
# activation: None or DataType
@pytest.mark.parametrize("act", [None, DataType["INT4"]])
# weight datatype
@pytest.mark.parametrize("wdt", [None, DataType["INT4"]])
@pytest.mark.parametrize("wdt", [DataType["INT4"]])
# input datatype
@pytest.mark.parametrize("idt", [DataType["INT4"]])
# neuron folding, -1 is maximum possible
Expand All @@ -524,12 +465,10 @@ def test_fpgadataflow_mvau_rtlsim(mem_mode, idt, wdt, act, nf, sf, mw, mh):
@pytest.mark.fpgadataflow
@pytest.mark.vivado
def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
mem_mode, idt, wdt, act, nf, sf, mw, mh, preferred_impl_style, mvau
mem_mode, idt, wdt, act, nf, sf, mw, mh, preferred_impl_style
):
if preferred_impl_style == "rtl" and act is not None:
pytest.skip("RTL-MVAU doesn't support const mem mode or embedded activations")
if preferred_impl_style == "hls" and mvau == "dynamic":
pytest.skip("HLS-MVAU doesn't support dynamic matmul")
if nf == -1:
nf = mh
if sf == -1:
Expand All @@ -538,14 +477,10 @@ def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
simd = mw // sf
assert mh % pe == 0
assert mw % sf == 0
# generate inputs
inp_A = gen_finn_dt_tensor(idt, (1, mw))
if mvau == "dynamic":
# dynamic matmul second input
inp_B = gen_finn_dt_tensor(idt, (mw, mh))
else:
# This is the weight matrix
inp_B = gen_finn_dt_tensor(wdt, (mw, mh))
# generate weights
W = gen_finn_dt_tensor(wdt, (mw, mh))
# generate input data
x = gen_finn_dt_tensor(idt, (1, mw))
if act is None:
# no activation, produce accumulators
T = None
Expand All @@ -555,8 +490,6 @@ def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
else:
odt = DataType["INT32"]
else:
if mvau == "dynamic":
pytest.skip("Dynamic matmul not supported")
odt = act
(min, max) = calculate_signed_dot_prod_range(idt, wdt, mw)
n_steps = act.get_num_possible_values() - 1
Expand All @@ -571,8 +504,7 @@ def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
assert (T >= 0).all()
else:
tdt = DataType["INT32"]
model = make_single_fclayer_modelwrapper(inp_B, pe, simd, wdt, idt, odt, T, tdt, inp_A, mvau)
model.save("mvau_large_depth.onnx")
model = make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T, tdt)
for node in model.graph.node:
# lookup op_type in registry of CustomOps
inst = getCustomOp(node)
Expand All @@ -581,12 +513,12 @@ def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
inst.set_nodeattr("preferred_impl_style", preferred_impl_style)

# prepare input data
input_dict = prepare_inputs(inp_A, idt, wdt, inp_B=inp_B)
input_dict = prepare_inputs(x, idt, wdt)
if wdt == DataType["BIPOLAR"] and idt == DataType["BIPOLAR"]:
# convert inputs to binary and use xnorpopcountmatmul
y = xp.xnorpopcountmatmul((inp_A + 1) / 2, (inp_B + 1) / 2)
y = xp.xnorpopcountmatmul((x + 1) / 2, (W + 1) / 2)
else:
y = np.matmul(inp_A, inp_B)
y = np.matmul(x, W)
if T is not None:
y = multithreshold(y, T)
if act == DataType["BIPOLAR"]:
Expand All @@ -600,10 +532,8 @@ def test_fpgadataflow_mvau_large_depth_decoupled_mode_rtlsim(
# TODO split up into several dependent tests -- need to check how this
# works for parametrized tests...
model = model.transform(SpecializeLayers("xczu7ev-ffvc1156-2-e"))
model.save("mvau_large_depth.onnx")
if mvau == "weight":
model = model.transform(MinimizeWeightBitWidth())
model = model.transform(MinimizeAccumulatorWidth())
model = model.transform(MinimizeWeightBitWidth())
model = model.transform(MinimizeAccumulatorWidth())
model = model.transform(SetExecMode("rtlsim"))
model = model.transform(GiveUniqueNodeNames())
model = model.transform(PrepareIP("xczu7ev-ffvc1156-2-e", 5))
Expand Down Expand Up @@ -661,13 +591,8 @@ def test_mvau_fifocharacterize_rtlsim(
simd = mw // sf
assert mh % pe == 0
assert mw % sf == 0
# generate inputs
if wdt is None:
# dynamic matmul second input
inp_B = gen_finn_dt_tensor(idt, (mw, mh))
else:
# This is the weight matrix
inp_B = gen_finn_dt_tensor(wdt, (mw, mh))
# generate weights
W = gen_finn_dt_tensor(wdt, (mw, mh))

# no activation, produce accumulators
T = None
Expand All @@ -677,7 +602,7 @@ def test_mvau_fifocharacterize_rtlsim(
else:
odt = DataType["INT32"]

model = make_single_fclayer_modelwrapper(inp_B, pe, simd, wdt, idt, odt, T, tdt)
model = make_single_fclayer_modelwrapper(W, pe, simd, wdt, idt, odt, T, tdt)
for node in model.graph.node:
# lookup op_type in registry of CustomOps
inst = getCustomOp(node)
Expand Down

0 comments on commit 129fe70

Please sign in to comment.