Skip to content

Commit

Permalink
Formatting of CFG Block methods
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcere committed Oct 7, 2024
1 parent 4b25a87 commit 5411f55
Showing 1 changed file with 73 additions and 76 deletions.
149 changes: 73 additions & 76 deletions src/parser/cfg_block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from global_params.types import instr_id_T
from parser.cfg_instruction import CFGInstruction, build_push_spec, build_pushtag_spec
from parser.utils_parser import is_in_input_stack, is_in_output_stack, are_dependent_interval, get_empty_spec, \
get_expression, are_dependent_accesses, replace_pos_instrsid, generate_dep, get_interval
Expand All @@ -16,51 +17,50 @@
function_tags = {}



def include_function_call_tags(ins, out_idx, block_spec):
global function_tags
global tag_idx

in_tag, out_tag = function_tags.get(ins.get_op_name(), (-1,-1))
in_tag, out_tag = function_tags.get(ins.get_op_name(), (-1, -1))

if in_tag == -1 and out_tag == -1:
out_tag = tag_idx
in_tag = tag_idx+1
tag_idx+=2
in_tag = tag_idx + 1
tag_idx += 2

function_tags[ins.get_op_name()] = (in_tag, out_tag)

in_tag_instr = build_pushtag_spec(out_idx, in_tag)
out_idx+=1
out_idx += 1

out_tag_instr = build_pushtag_spec(out_idx, out_tag)

block_spec["user_instrs"]+=[in_tag_instr,out_tag_instr]
block_spec["user_instrs"] += [in_tag_instr, out_tag_instr]

#It adds the out jump label after the arguments of the function
# It adds the out jump label after the arguments of the function
num_funct_arguments = len(ins.get_in_args())
block_spec["tgt_ws"] = block_spec["tgt_ws"][:num_funct_arguments]+out_tag_instr["outpt_sk"]+block_spec["tgt_ws"][num_funct_arguments:]

#It adds at top of the stack de input jump label
block_spec["tgt_ws"] = in_tag_instr["outpt_sk"]+block_spec["tgt_ws"]
block_spec["tgt_ws"] = block_spec["tgt_ws"][:num_funct_arguments] + out_tag_instr["outpt_sk"] + block_spec[
"tgt_ws"][
num_funct_arguments:]

# It adds at top of the stack de input jump label
block_spec["tgt_ws"] = in_tag_instr["outpt_sk"] + block_spec["tgt_ws"]

#It adds in variables the new identifier for the in and out jump label
block_spec["variables"]+=in_tag_instr["outpt_sk"]+ out_tag_instr["outpt_sk"]
# It adds in variables the new identifier for the in and out jump label
block_spec["variables"] += in_tag_instr["outpt_sk"] + out_tag_instr["outpt_sk"]

block_spec["yul_expressions"]+="\n"+ins.get_instruction_representation()
block_spec["yul_expressions"] += "\n" + ins.get_instruction_representation()

return block_spec, out_idx




class CFGBlock:
"""
Class for representing a cfg block
"""

def __init__(self, identifier : str, instructions: List[CFGInstruction], type_block: str, assignment_dict: Dict[str, str]):
def __init__(self, identifier: str, instructions: List[CFGInstruction], type_block: str,
assignment_dict: Dict[str, str]):
self.block_id = identifier
self._instructions = instructions
# minimum size of the source stack
Expand Down Expand Up @@ -123,12 +123,12 @@ def set_instructions(self, new_instructions: List[CFGInstruction]) -> None:

# Then we update the source stack size
# TODO
#self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode()))
# self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode()))

def add_instruction(self, new_instr: CFGInstruction) -> None:
self._instructions.append(new_instr)
# TODO
#self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode()))
# self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode()))

def add_comes_from(self, block_id: str) -> None:
self._comes_from.append(block_id)
Expand All @@ -139,16 +139,16 @@ def get_comes_from(self) -> List[str]:
def set_comes_from(self, new_comes_from: List[str]) -> None:
self._comes_from = new_comes_from

def set_jump_type(self, t : str) -> None:
if t not in ["conditional","unconditional","terminal", "falls_to", "sub_block", "split_instruction_block"]:
def set_jump_type(self, t: str) -> None:
if t not in ["conditional", "unconditional", "terminal", "falls_to", "sub_block", "split_instruction_block"]:
raise Exception("Wrong jump type")
else:
self._jump_type = t

def set_jump_to(self, blockId : str) -> None:
def set_jump_to(self, blockId: str) -> None:
self._jump_to = blockId

def set_falls_to(self, blockId :str) -> None:
def set_falls_to(self, blockId: str) -> None:
self._falls_to = blockId

def set_length(self) -> int:
Expand Down Expand Up @@ -184,10 +184,10 @@ def set_jump_info(self, exit_info: Dict[str, Any]) -> None:
self._process_instructions_from_jump()

elif type_block in ["Terminated"]:
#We do not store the direction as itgenerates a loop
# We do not store the direction as itgenerates a loop
self._jump_type = "terminal"
elif type_block in [""]:
#It corresponds to falls_to blocks
# It corresponds to falls_to blocks
self._jump_type = "falls_to"
elif type_block in ["MainExit"]:
self._jump_type = "terminal"
Expand All @@ -200,25 +200,24 @@ def process_function_calls(self, function_ids):
self.function_calls = set(calls)

def check_validity_arguments(self):
'''
"""
It checks for each instruction in the block that there is not
any previous instruction that uses as input argument the variable
that is generating as output (there is not aliasing).
'''
"""

for i in range(len(self._instructions)):
instr = self._instructions[i]
out_var = instr.get_out_args()
if len(out_var) > 0:
out_var_set = set(out_var)
pred_inputs = map(lambda x: set(x.get_in_args()).intersection(out_var_set),self._instructions[:i+1])
pred_inputs = map(lambda x: set(x.get_in_args()).intersection(out_var_set), self._instructions[:i + 1])
candidates = list(filter(lambda x: x != set(), pred_inputs))
if len(candidates) != 0:
print("[WARNING]: Aliasing between variables!")


def _process_dependences(self, instructions, map_positions):

def _process_dependences(self, instructions: List[CFGInstruction],
map_positions: Dict[int, instr_id_T]) -> List[Tuple[instr_id_T, instr_id_T]]:
sto_dep = self._compute_storage_dependences(instructions)
sto_dep = self._simplify_dependences(sto_dep)
sto_deps = replace_pos_instrsid(sto_dep, map_positions)
Expand All @@ -228,58 +227,61 @@ def _process_dependences(self, instructions, map_positions):
mem_deps = replace_pos_instrsid(mem_dep, map_positions)
return sto_deps, mem_deps

def _compute_storage_dependences(self,instructions):
def _compute_storage_dependences(self, instructions: List[CFGInstruction]):
sto_ins = []
# print(instructions)
for i in range(len(instructions)):
ins = instructions[i]
if ins.get_op_name() in ["sload","sstore"]:
if ins.get_op_name() in ["sload", "sstore"]:
v = ins.get_in_args()[0]
input_val = get_expression(v, instructions[:i])
sto_ins.append([i,input_val,ins.get_type_mem_op()])
#elif ins.get_op_name() in ["call","delegatecall","staticcall","callcode"]:
sto_ins.append([i, input_val, ins.get_type_mem_op()])
# elif ins.get_op_name() in ["call","delegatecall","staticcall","callcode"]:
# sto_ins.append([i,["inf"],"write"])

deps = [[sto_ins[i][0],j[0]] for i in range(len(sto_ins)) for j in sto_ins[i+1:] if are_dependent_accesses(sto_ins[i][1],j[1]) and generate_dep(sto_ins[i][2], j[2])]
deps = [[sto_ins[i][0], j[0]] for i in range(len(sto_ins)) for j in sto_ins[i + 1:] if
are_dependent_accesses(sto_ins[i][1], j[1]) and generate_dep(sto_ins[i][2], j[2])]
# print("DEPS: "+str(deps))
# print("******")
return deps

def _compute_memory_dependences(self, instructions):
def _compute_memory_dependences(self, instructions: List[CFGInstruction]):
mem_ins = []

mem_instrs_access = ["mload", "mstore", "mstore8"]
mem_instrs_offset = ["keccak256"]#, "codecopy","extcodecopy","calldatacopy","returndatacopy","mcopy","log0","log1","log2","log3","log4","create","create2","call","delegatecall","staticcall","callcode"]

mem_instrs_offset = [
"keccak256"] # , "codecopy","extcodecopy","calldatacopy","returndatacopy","mcopy","log0","log1","log2","log3","log4","create","create2","call","delegatecall","staticcall","callcode"]

for i in range(len(instructions)):
ins = instructions[i]
if ins.get_op_name() in mem_instrs_access:
v = ins.get_in_args()[0]
input_val = get_expression(v, instructions[:i])
interval = [input_val, ["0x20"]]
mem_ins.append([i,interval,ins.get_type_mem_op()])
mem_ins.append([i, interval, ins.get_type_mem_op()])

elif ins.get_op_name() in mem_instrs_offset:
values = ins.get_in_args()
elif ins.get_op_name() in mem_instrs_offset:
values = ins.get_in_args()

interval_args = get_interval(ins.get_op_name(),values)
interval_args = get_interval(ins.get_op_name(), values)

if ins.get_op_name() not in ["call","callcode","delegatecall","staticcall"]:
if ins.get_op_name() not in ["call", "callcode", "delegatecall", "staticcall"]:
input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args))
interval = [input_vals[0],input_vals[1]]
mem_ins.append([i,interval,ins.get_type_mem_op()])
interval = [input_vals[0], input_vals[1]]
mem_ins.append([i, interval, ins.get_type_mem_op()])

# else:

# input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args[0]))
# interval = [input_vals[0],input_vals[1]]
# mem_ins.append([i,interval,"read"])

# input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args[1]))
# interval = [input_vals[0],input_vals[1]]
# mem_ins.append([i,interval, "write"])

deps = [[mem_ins[i][0],j[0]] for i in range(len(mem_ins)) for j in mem_ins[i+1:] if are_dependent_interval(mem_ins[i][1],j[1]) and generate_dep(mem_ins[i][2], j[2])]
deps = [[mem_ins[i][0], j[0]] for i in range(len(mem_ins)) for j in mem_ins[i + 1:] if
are_dependent_interval(mem_ins[i][1], j[1]) and generate_dep(mem_ins[i][2], j[2])]
# print("DEPS: "+str(deps))
# print("******")
return deps
Expand All @@ -300,26 +302,26 @@ def get_as_json(self):

block_json["instructions"] = instructions_json

block_json["exit"] = self.block_id+"Exit"
block_json["exit"] = self.block_id + "Exit"
block_json["type"] = "BasicBlock"

jump_block = {}

if self._jump_type == "conditional":
jump_block["id"] = self.block_id+"Exit"
jump_block["id"] = self.block_id + "Exit"
jump_block["instructions"] = []
jump_block["type"] = "ConditionalJump"
jump_block["exit"] = [self._falls_to, self._jump_to]
jump_block["cond"] = self._instructions[-1].get_out_args()

elif self._jump_type == "unconditional":
jump_block["id"] = self.block_id+"Exit"
jump_block["id"] = self.block_id + "Exit"
jump_block["instructions"] = []
jump_block["type"] = "Jump"
jump_block["exit"] = [self._jump_to]

elif self._jump_type == "mainExit":
jump_block["id"] = self.block_id+"Exit"
jump_block["id"] = self.block_id + "Exit"
jump_block["instructions"] = []
jump_block["type"] = "MainExit"
jump_block["exit"] = [self._jump_to]
Expand All @@ -331,7 +333,7 @@ def _get_vars_spec(self, uninter_instructions):
vars_spec = set()

for i in uninter_instructions:
all_vars = i["inpt_sk"]+i["outpt_sk"]
all_vars = i["inpt_sk"] + i["outpt_sk"]
for a in all_vars:
vars_spec.add(a)

Expand All @@ -355,7 +357,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx
jump_instr = None

for i in range(len(instructions)):
#Check if it has been already created
# Check if it has been already created

ins = instructions[i]

Expand All @@ -381,36 +383,37 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx

continue

ins_spec = map_instructions.get((ins.get_op_name().upper(),tuple(ins.get_in_args())), None)
ins_spec = map_instructions.get((ins.get_op_name().upper(), tuple(ins.get_in_args())), None)

if ins_spec is None:
if ins.get_op_name().startswith("assignment"):
result, new_out_idx = ins.build_spec_assigment(new_out_idx, instrs_idx, map_instructions)
else:
result, new_out_idx = ins.build_spec(new_out_idx, instrs_idx, map_instructions)

uninter_functions+=result
uninter_functions += result

map_positions_instructions[i] = result[-1]["id"]

# Assignments might be generated from phi functions
for out_val, in_val in self.assignment_dict.items():
# if is_used:

if in_val.startswith("0x"): #It is a push value
if in_val.startswith("0x"): # It is a push value
func = map_instructions.get(("PUSH", tuple([in_val])), -1)
if func == -1:
push_name = "PUSH" if int(in_val,16) != 0 else "PUSH0"
push_name = "PUSH" if int(in_val, 16) != 0 else "PUSH0"
inst_idx = instrs_idx.get(push_name, 0)
instrs_idx[push_name] = inst_idx+1
instrs_idx[push_name] = inst_idx + 1
push_ins = build_push_spec(in_val, inst_idx, [out_val])

map_instructions[("PUSH",tuple([in_val]))] = push_ins
map_instructions[("PUSH", tuple([in_val]))] = push_ins

uninter_functions.append(push_ins)

instr_repr = '\n'.join([instr.get_instruction_representation() for instr in self._instructions])
assignment_repr = '\n'.join([f"{out_value} = {in_value}" for out_value, in_value in self.assignment_dict.items()])
assignment_repr = '\n'.join(
[f"{out_value} = {in_value}" for out_value, in_value in self.assignment_dict.items()])

combined_repr = '\n'.join(repr_ for repr_ in [assignment_repr, instr_repr] if repr_ != "")

Expand All @@ -426,7 +429,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx
spec["memory_dependences"] = []
spec["storage_dependences"] = []

#They are not used in greedy algorithm
# They are not used in greedy algorithm
spec["init_progr_len"] = 0
spec["max_progr_len"] = 0
spec["min_length_instrs"] = 0
Expand Down Expand Up @@ -467,7 +470,7 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic
# out_idx = 0
# # print("BLOCK TAG", block_tag_idx)
# # print(self._instructions)

# for i in range(len(self._instructions)):
# ins = self._instructions[i]
# if ins.get_op_name().upper() in constants.split_block or ins.get_op_name() in self.function_calls:
Expand Down Expand Up @@ -497,8 +500,6 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic
# print(str(self.block_id)+"_"+str(cont-1))
# print(json.dumps(r, indent=4))



# #We reset the seq of instructions and the out_idx for next block
# ins_seq = []
# out_idx = 0
Expand All @@ -520,7 +521,6 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic
# if not self._jump_type in ["conditional","unconditional"]:
# print(str(self.block_id)+"_"+str(cont))
# print(json.dumps(r, indent=4))


# else:
# r = get_empty_spec()
Expand All @@ -538,7 +538,7 @@ def build_spec(self, block_tags_dict: Dict, block_tag_idx: int, initial_stack: L
final_stack: List[str]) -> Tuple[Dict[str, Any], int, int]:

map_instructions = {}

out_idx = 0

spec, out_idx, map_positions = self._build_spec_for_sequence(self._instructions, map_instructions,
Expand All @@ -548,19 +548,16 @@ def build_spec(self, block_tags_dict: Dict, block_tag_idx: int, initial_stack: L
spec["storage_dependences"] = sto_deps
spec["memory_dependences"] = mem_deps


#Just to print information if it is not a jump
if not self._jump_type in ["conditional","unconditional"]:
# Just to print information if it is not a jump
if not self._jump_type in ["conditional", "unconditional"]:
logging.debug(f"Building Spec of block {self.block_id}...")
logging.debug(json.dumps(spec, indent=4))

return spec, out_idx, block_tag_idx


def __str__(self):

s = "BlockID: " + self.block_id+ "\n"
s += "Type: " + self._jump_type+ "\n"
s = "BlockID: " + self.block_id + "\n"
s += "Type: " + self._jump_type + "\n"
s += "Jump to: " + str(self._jump_to) + "\n"
s += "Falls to: " + str(self._falls_to) + "\n"
s += "Comes_from: " + str(self._comes_from) + "\n"
Expand Down

0 comments on commit 5411f55

Please sign in to comment.