From 5411f555e417351e4a0bdd0d41b2a5cea2cb01eb Mon Sep 17 00:00:00 2001 From: alexcere <48130030+alexcere@users.noreply.github.com> Date: Mon, 7 Oct 2024 15:44:02 +0200 Subject: [PATCH] Formatting of CFG Block methods --- src/parser/cfg_block.py | 149 ++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 76 deletions(-) diff --git a/src/parser/cfg_block.py b/src/parser/cfg_block.py index 67f0c201..423ecf14 100644 --- a/src/parser/cfg_block.py +++ b/src/parser/cfg_block.py @@ -1,5 +1,6 @@ import logging +from global_params.types import instr_id_T from parser.cfg_instruction import CFGInstruction, build_push_spec, build_pushtag_spec from parser.utils_parser import is_in_input_stack, is_in_output_stack, are_dependent_interval, get_empty_spec, \ get_expression, are_dependent_accesses, replace_pos_instrsid, generate_dep, get_interval @@ -16,51 +17,50 @@ function_tags = {} - def include_function_call_tags(ins, out_idx, block_spec): global function_tags global tag_idx - in_tag, out_tag = function_tags.get(ins.get_op_name(), (-1,-1)) + in_tag, out_tag = function_tags.get(ins.get_op_name(), (-1, -1)) if in_tag == -1 and out_tag == -1: out_tag = tag_idx - in_tag = tag_idx+1 - tag_idx+=2 + in_tag = tag_idx + 1 + tag_idx += 2 function_tags[ins.get_op_name()] = (in_tag, out_tag) in_tag_instr = build_pushtag_spec(out_idx, in_tag) - out_idx+=1 + out_idx += 1 out_tag_instr = build_pushtag_spec(out_idx, out_tag) - block_spec["user_instrs"]+=[in_tag_instr,out_tag_instr] + block_spec["user_instrs"] += [in_tag_instr, out_tag_instr] - #It adds the out jump label after the arguments of the function + # It adds the out jump label after the arguments of the function num_funct_arguments = len(ins.get_in_args()) - block_spec["tgt_ws"] = block_spec["tgt_ws"][:num_funct_arguments]+out_tag_instr["outpt_sk"]+block_spec["tgt_ws"][num_funct_arguments:] - - #It adds at top of the stack de input jump label - block_spec["tgt_ws"] = in_tag_instr["outpt_sk"]+block_spec["tgt_ws"] + block_spec["tgt_ws"] = block_spec["tgt_ws"][:num_funct_arguments] + out_tag_instr["outpt_sk"] + block_spec[ + "tgt_ws"][ + num_funct_arguments:] + # It adds at top of the stack de input jump label + block_spec["tgt_ws"] = in_tag_instr["outpt_sk"] + block_spec["tgt_ws"] - #It adds in variables the new identifier for the in and out jump label - block_spec["variables"]+=in_tag_instr["outpt_sk"]+ out_tag_instr["outpt_sk"] + # It adds in variables the new identifier for the in and out jump label + block_spec["variables"] += in_tag_instr["outpt_sk"] + out_tag_instr["outpt_sk"] - block_spec["yul_expressions"]+="\n"+ins.get_instruction_representation() + block_spec["yul_expressions"] += "\n" + ins.get_instruction_representation() return block_spec, out_idx - - class CFGBlock: """ Class for representing a cfg block """ - def __init__(self, identifier : str, instructions: List[CFGInstruction], type_block: str, assignment_dict: Dict[str, str]): + def __init__(self, identifier: str, instructions: List[CFGInstruction], type_block: str, + assignment_dict: Dict[str, str]): self.block_id = identifier self._instructions = instructions # minimum size of the source stack @@ -123,12 +123,12 @@ def set_instructions(self, new_instructions: List[CFGInstruction]) -> None: # Then we update the source stack size # TODO - #self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode())) + # self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode())) def add_instruction(self, new_instr: CFGInstruction) -> None: self._instructions.append(new_instr) # TODO - #self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode())) + # self.source_stack = utils.compute_stack_size(map(lambda x: x.disasm, self.instructions_to_optimize_bytecode())) def add_comes_from(self, block_id: str) -> None: self._comes_from.append(block_id) @@ -139,16 +139,16 @@ def get_comes_from(self) -> List[str]: def set_comes_from(self, new_comes_from: List[str]) -> None: self._comes_from = new_comes_from - def set_jump_type(self, t : str) -> None: - if t not in ["conditional","unconditional","terminal", "falls_to", "sub_block", "split_instruction_block"]: + def set_jump_type(self, t: str) -> None: + if t not in ["conditional", "unconditional", "terminal", "falls_to", "sub_block", "split_instruction_block"]: raise Exception("Wrong jump type") else: self._jump_type = t - def set_jump_to(self, blockId : str) -> None: + def set_jump_to(self, blockId: str) -> None: self._jump_to = blockId - def set_falls_to(self, blockId :str) -> None: + def set_falls_to(self, blockId: str) -> None: self._falls_to = blockId def set_length(self) -> int: @@ -184,10 +184,10 @@ def set_jump_info(self, exit_info: Dict[str, Any]) -> None: self._process_instructions_from_jump() elif type_block in ["Terminated"]: - #We do not store the direction as itgenerates a loop + # We do not store the direction as itgenerates a loop self._jump_type = "terminal" elif type_block in [""]: - #It corresponds to falls_to blocks + # It corresponds to falls_to blocks self._jump_type = "falls_to" elif type_block in ["MainExit"]: self._jump_type = "terminal" @@ -200,25 +200,24 @@ def process_function_calls(self, function_ids): self.function_calls = set(calls) def check_validity_arguments(self): - ''' + """ It checks for each instruction in the block that there is not any previous instruction that uses as input argument the variable that is generating as output (there is not aliasing). - ''' + """ for i in range(len(self._instructions)): instr = self._instructions[i] out_var = instr.get_out_args() if len(out_var) > 0: out_var_set = set(out_var) - pred_inputs = map(lambda x: set(x.get_in_args()).intersection(out_var_set),self._instructions[:i+1]) + pred_inputs = map(lambda x: set(x.get_in_args()).intersection(out_var_set), self._instructions[:i + 1]) candidates = list(filter(lambda x: x != set(), pred_inputs)) if len(candidates) != 0: print("[WARNING]: Aliasing between variables!") - - - def _process_dependences(self, instructions, map_positions): + def _process_dependences(self, instructions: List[CFGInstruction], + map_positions: Dict[int, instr_id_T]) -> List[Tuple[instr_id_T, instr_id_T]]: sto_dep = self._compute_storage_dependences(instructions) sto_dep = self._simplify_dependences(sto_dep) sto_deps = replace_pos_instrsid(sto_dep, map_positions) @@ -228,58 +227,61 @@ def _process_dependences(self, instructions, map_positions): mem_deps = replace_pos_instrsid(mem_dep, map_positions) return sto_deps, mem_deps - def _compute_storage_dependences(self,instructions): + def _compute_storage_dependences(self, instructions: List[CFGInstruction]): sto_ins = [] # print(instructions) for i in range(len(instructions)): ins = instructions[i] - if ins.get_op_name() in ["sload","sstore"]: + if ins.get_op_name() in ["sload", "sstore"]: v = ins.get_in_args()[0] input_val = get_expression(v, instructions[:i]) - sto_ins.append([i,input_val,ins.get_type_mem_op()]) - #elif ins.get_op_name() in ["call","delegatecall","staticcall","callcode"]: + sto_ins.append([i, input_val, ins.get_type_mem_op()]) + # elif ins.get_op_name() in ["call","delegatecall","staticcall","callcode"]: # sto_ins.append([i,["inf"],"write"]) - deps = [[sto_ins[i][0],j[0]] for i in range(len(sto_ins)) for j in sto_ins[i+1:] if are_dependent_accesses(sto_ins[i][1],j[1]) and generate_dep(sto_ins[i][2], j[2])] + deps = [[sto_ins[i][0], j[0]] for i in range(len(sto_ins)) for j in sto_ins[i + 1:] if + are_dependent_accesses(sto_ins[i][1], j[1]) and generate_dep(sto_ins[i][2], j[2])] # print("DEPS: "+str(deps)) # print("******") return deps - def _compute_memory_dependences(self, instructions): + def _compute_memory_dependences(self, instructions: List[CFGInstruction]): mem_ins = [] mem_instrs_access = ["mload", "mstore", "mstore8"] - mem_instrs_offset = ["keccak256"]#, "codecopy","extcodecopy","calldatacopy","returndatacopy","mcopy","log0","log1","log2","log3","log4","create","create2","call","delegatecall","staticcall","callcode"] - + mem_instrs_offset = [ + "keccak256"] # , "codecopy","extcodecopy","calldatacopy","returndatacopy","mcopy","log0","log1","log2","log3","log4","create","create2","call","delegatecall","staticcall","callcode"] + for i in range(len(instructions)): ins = instructions[i] if ins.get_op_name() in mem_instrs_access: v = ins.get_in_args()[0] input_val = get_expression(v, instructions[:i]) interval = [input_val, ["0x20"]] - mem_ins.append([i,interval,ins.get_type_mem_op()]) + mem_ins.append([i, interval, ins.get_type_mem_op()]) - elif ins.get_op_name() in mem_instrs_offset: - values = ins.get_in_args() + elif ins.get_op_name() in mem_instrs_offset: + values = ins.get_in_args() - interval_args = get_interval(ins.get_op_name(),values) + interval_args = get_interval(ins.get_op_name(), values) - if ins.get_op_name() not in ["call","callcode","delegatecall","staticcall"]: + if ins.get_op_name() not in ["call", "callcode", "delegatecall", "staticcall"]: input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args)) - interval = [input_vals[0],input_vals[1]] - mem_ins.append([i,interval,ins.get_type_mem_op()]) - + interval = [input_vals[0], input_vals[1]] + mem_ins.append([i, interval, ins.get_type_mem_op()]) + # else: - + # input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args[0])) # interval = [input_vals[0],input_vals[1]] # mem_ins.append([i,interval,"read"]) - + # input_vals = list(map(lambda x: get_expression(x, instructions[:i]), interval_args[1])) # interval = [input_vals[0],input_vals[1]] # mem_ins.append([i,interval, "write"]) - deps = [[mem_ins[i][0],j[0]] for i in range(len(mem_ins)) for j in mem_ins[i+1:] if are_dependent_interval(mem_ins[i][1],j[1]) and generate_dep(mem_ins[i][2], j[2])] + deps = [[mem_ins[i][0], j[0]] for i in range(len(mem_ins)) for j in mem_ins[i + 1:] if + are_dependent_interval(mem_ins[i][1], j[1]) and generate_dep(mem_ins[i][2], j[2])] # print("DEPS: "+str(deps)) # print("******") return deps @@ -300,26 +302,26 @@ def get_as_json(self): block_json["instructions"] = instructions_json - block_json["exit"] = self.block_id+"Exit" + block_json["exit"] = self.block_id + "Exit" block_json["type"] = "BasicBlock" jump_block = {} if self._jump_type == "conditional": - jump_block["id"] = self.block_id+"Exit" + jump_block["id"] = self.block_id + "Exit" jump_block["instructions"] = [] jump_block["type"] = "ConditionalJump" jump_block["exit"] = [self._falls_to, self._jump_to] jump_block["cond"] = self._instructions[-1].get_out_args() elif self._jump_type == "unconditional": - jump_block["id"] = self.block_id+"Exit" + jump_block["id"] = self.block_id + "Exit" jump_block["instructions"] = [] jump_block["type"] = "Jump" jump_block["exit"] = [self._jump_to] elif self._jump_type == "mainExit": - jump_block["id"] = self.block_id+"Exit" + jump_block["id"] = self.block_id + "Exit" jump_block["instructions"] = [] jump_block["type"] = "MainExit" jump_block["exit"] = [self._jump_to] @@ -331,7 +333,7 @@ def _get_vars_spec(self, uninter_instructions): vars_spec = set() for i in uninter_instructions: - all_vars = i["inpt_sk"]+i["outpt_sk"] + all_vars = i["inpt_sk"] + i["outpt_sk"] for a in all_vars: vars_spec.add(a) @@ -355,7 +357,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx jump_instr = None for i in range(len(instructions)): - #Check if it has been already created + # Check if it has been already created ins = instructions[i] @@ -381,7 +383,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx continue - ins_spec = map_instructions.get((ins.get_op_name().upper(),tuple(ins.get_in_args())), None) + ins_spec = map_instructions.get((ins.get_op_name().upper(), tuple(ins.get_in_args())), None) if ins_spec is None: if ins.get_op_name().startswith("assignment"): @@ -389,7 +391,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx else: result, new_out_idx = ins.build_spec(new_out_idx, instrs_idx, map_instructions) - uninter_functions+=result + uninter_functions += result map_positions_instructions[i] = result[-1]["id"] @@ -397,20 +399,21 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx for out_val, in_val in self.assignment_dict.items(): # if is_used: - if in_val.startswith("0x"): #It is a push value + if in_val.startswith("0x"): # It is a push value func = map_instructions.get(("PUSH", tuple([in_val])), -1) if func == -1: - push_name = "PUSH" if int(in_val,16) != 0 else "PUSH0" + push_name = "PUSH" if int(in_val, 16) != 0 else "PUSH0" inst_idx = instrs_idx.get(push_name, 0) - instrs_idx[push_name] = inst_idx+1 + instrs_idx[push_name] = inst_idx + 1 push_ins = build_push_spec(in_val, inst_idx, [out_val]) - map_instructions[("PUSH",tuple([in_val]))] = push_ins + map_instructions[("PUSH", tuple([in_val]))] = push_ins uninter_functions.append(push_ins) instr_repr = '\n'.join([instr.get_instruction_representation() for instr in self._instructions]) - assignment_repr = '\n'.join([f"{out_value} = {in_value}" for out_value, in_value in self.assignment_dict.items()]) + assignment_repr = '\n'.join( + [f"{out_value} = {in_value}" for out_value, in_value in self.assignment_dict.items()]) combined_repr = '\n'.join(repr_ for repr_ in [assignment_repr, instr_repr] if repr_ != "") @@ -426,7 +429,7 @@ def _build_spec_for_sequence(self, instructions, map_instructions: Dict, out_idx spec["memory_dependences"] = [] spec["storage_dependences"] = [] - #They are not used in greedy algorithm + # They are not used in greedy algorithm spec["init_progr_len"] = 0 spec["max_progr_len"] = 0 spec["min_length_instrs"] = 0 @@ -467,7 +470,7 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic # out_idx = 0 # # print("BLOCK TAG", block_tag_idx) # # print(self._instructions) - + # for i in range(len(self._instructions)): # ins = self._instructions[i] # if ins.get_op_name().upper() in constants.split_block or ins.get_op_name() in self.function_calls: @@ -497,8 +500,6 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic # print(str(self.block_id)+"_"+str(cont-1)) # print(json.dumps(r, indent=4)) - - # #We reset the seq of instructions and the out_idx for next block # ins_seq = [] # out_idx = 0 @@ -520,7 +521,6 @@ def _include_jump_tag(self, block_spec: Dict, out_idx: int, block_tags_dict: Dic # if not self._jump_type in ["conditional","unconditional"]: # print(str(self.block_id)+"_"+str(cont)) # print(json.dumps(r, indent=4)) - # else: # r = get_empty_spec() @@ -538,7 +538,7 @@ def build_spec(self, block_tags_dict: Dict, block_tag_idx: int, initial_stack: L final_stack: List[str]) -> Tuple[Dict[str, Any], int, int]: map_instructions = {} - + out_idx = 0 spec, out_idx, map_positions = self._build_spec_for_sequence(self._instructions, map_instructions, @@ -548,19 +548,16 @@ def build_spec(self, block_tags_dict: Dict, block_tag_idx: int, initial_stack: L spec["storage_dependences"] = sto_deps spec["memory_dependences"] = mem_deps - - #Just to print information if it is not a jump - if not self._jump_type in ["conditional","unconditional"]: + # Just to print information if it is not a jump + if not self._jump_type in ["conditional", "unconditional"]: logging.debug(f"Building Spec of block {self.block_id}...") logging.debug(json.dumps(spec, indent=4)) return spec, out_idx, block_tag_idx - def __str__(self): - - s = "BlockID: " + self.block_id+ "\n" - s += "Type: " + self._jump_type+ "\n" + s = "BlockID: " + self.block_id + "\n" + s += "Type: " + self._jump_type + "\n" s += "Jump to: " + str(self._jump_to) + "\n" s += "Falls to: " + str(self._falls_to) + "\n" s += "Comes_from: " + str(self._comes_from) + "\n"