diff --git a/.gitignore b/.gitignore index 25d4f69..2b2bbd2 100644 --- a/.gitignore +++ b/.gitignore @@ -124,4 +124,7 @@ venv.bak/ dmypy.json # Pyre type checker -.pyre/ \ No newline at end of file +.pyre/ + +Temp* +output* diff --git a/Dockerfile b/Dockerfile index 0b03a3a..ce1dfa0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,9 @@ # This Dockerfile is a modified version of https://github.com/bskaggs/ghidra-docker/blob/master/Dockerfile FROM openjdk:11-slim -MAINTAINER anmarcel "anmarcel@cisco.com" -ARG GHIDRA_VERSION=9.1.2_PUBLIC_20200212 -ARG GHIDRA_SHA256=ebe3fa4e1afd7d97650990b27777bb78bd0427e8e70c1d0ee042aeb52decac61 +ARG GHIDRA_VERSION=10.0.4_PUBLIC_20210928 +ARG GHIDRA_SHA256=1ce9bdf2d7f6bdfe5dccd06da828af31bc74acfd800f71ade021d5211e820d5e RUN useradd -m ghidra && \ mkdir -p /srv/repositories && \ @@ -22,7 +21,7 @@ RUN apt-get install -y libgtk2.0 libidn11 libglu1-mesa WORKDIR /opt RUN apt-get update && apt-get install -y wget gettext-base patch && \ - wget -q -O ghidra.zip https://ghidra-sre.org/ghidra_${GHIDRA_VERSION}.zip + wget -q -O ghidra.zip https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_10.0.4_build/ghidra_${GHIDRA_VERSION}.zip RUN echo "${GHIDRA_SHA256} *ghidra.zip" | sha256sum -c && \ unzip ghidra.zip && \ rm ghidra.zip && \ diff --git a/config/config.json b/config/config.json index 789af03..2d0a6c2 100644 --- a/config/config.json +++ b/config/config.json @@ -2,7 +2,7 @@ "SAMPLES_DIR": "samples", "IDA_SAMPLES_DIR": "ida_samples", "GHIDRA_OUTPUT": "output", - "GHIDRA_PATH": "/Applications/ghidra_9.1.2_PUBLIC", + "GHIDRA_PATH": "/mnt/d/soft/Ghidra/ghidra_10.0.3_PUBLIC", "GHIDRA_SCRIPT": "ghidra_plugins", "GHIDRA_PROJECT": "ghidra_projects" } \ No newline at end of file diff --git a/flask_api.py b/flask_api.py index a04f9aa..67bb27d 100644 --- a/flask_api.py +++ b/flask_api.py @@ -41,7 +41,7 @@ app = Flask(__name__) # Load configuration -with open("config/config.json") as f_in: +with open('config/config.json') as f_in: j = json.load(f_in) SAMPLES_DIR = j['SAMPLES_DIR'] IDA_SAMPLES_DIR = j['IDA_SAMPLES_DIR'] @@ -52,10 +52,51 @@ GHIDRA_HEADLESS = os.path.join(GHIDRA_PATH, "support/analyzeHeadless") +def cmd_post_plugin(sha256, plugin_name, args = None): + output_path = os.path.join( + GHIDRA_OUTPUT, sha256 + plugin_name + "output.json") + + # assuming that plugins do not change the state of the binaries. Can be turned off later, if not needed + already_analyzed = ret_file_if_exists(output_path) + if already_analyzed: + return already_analyzed + #print(output_path) + project_path = os.path.join(GHIDRA_PROJECT, sha256 + ".gpr") + if os.path.isfile(project_path): + command = [sha256, "-process", sha256, "-noanalysis", "-scriptPath", GHIDRA_SCRIPT,"-postScript", plugin_name, args, output_path, "-log", "ghidra_log.txt"] if args else \ + [sha256, "-process", sha256, "-noanalysis", "-scriptPath", GHIDRA_SCRIPT,"-postScript", plugin_name, output_path, "-log", "ghidra_log.txt"] + + sub_cmd(command) + + result = ret_file_if_exists(output_path) + if result: + return result + else: + log.info("{} plugin failure".format(plugin_name)) + raise BadRequest("{} plugin failure".format(plugin_name)) + else: + raise BadRequest("Sample has not been analyzed") + +def sub_cmd(command): + log.debug("Ghidra analysis started") + p = subprocess.Popen([GHIDRA_HEADLESS, GHIDRA_PROJECT] + command, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + p.wait() + print(''.join(s.decode("utf-8") for s in list(p.stdout))) + log.debug("Ghidra analysis completed") + return p.stdout + ############################################# # UTILS # ############################################# +def ret_file_if_exists(output_path): + # Check if JSON response is available + if os.path.isfile(output_path): + with open(output_path) as f_in: + return (f_in.read(), 200) + + def set_logger(debug): """ Set logger level and syntax @@ -86,26 +127,11 @@ def server_init(): """ Server initialization: flask configuration, logging, etc. """ - # Check if SAMPLES_DIR folder is available - if not os.path.isdir(SAMPLES_DIR): - log.info("%s folder created" % SAMPLES_DIR) - os.mkdir(SAMPLES_DIR) - - # Check if IDA_SAMPLES_DIR folder is available - if not os.path.isdir(IDA_SAMPLES_DIR): - log.info("%s folder created" % IDA_SAMPLES_DIR) - os.mkdir(IDA_SAMPLES_DIR) - - # Check if GHIDRA_PROJECT folder is available - if not os.path.isdir(GHIDRA_PROJECT): - log.info("%s folder created" % GHIDRA_PROJECT) - os.mkdir(GHIDRA_PROJECT) - - # Check if GHIDRA_OUTPUT folder exists - if not os.path.isdir(GHIDRA_OUTPUT): - log.info("%s folder created" % GHIDRA_OUTPUT) - os.mkdir(GHIDRA_OUTPUT) - + # Check if needed folders is available + for folder in [SAMPLES_DIR, IDA_SAMPLES_DIR, GHIDRA_PROJECT, GHIDRA_OUTPUT]: + if not os.path.isdir(os.path.join('./', folder)): + log.info("%s folder created" % folder) + os.mkdir(folder) # 400 MB limit app.config["MAX_CONTENT_LENGTH"] = 400 * 1024 * 1024 @@ -154,20 +180,8 @@ def analyze_sample(): # Check if the sample has been analyzed project_path = os.path.join(GHIDRA_PROJECT, sha256 + ".gpr") if not os.path.isfile(project_path): - log.debug("Ghidra analysis started") - # Import the sample in Ghidra and perform the analysis - command = [GHIDRA_HEADLESS, - GHIDRA_PROJECT, - sha256, - "-import", - sample_path] - p = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - p.wait() - print(''.join(s.decode("utf-8") for s in list(p.stdout))) - log.debug("Ghidra analysis completed") - + sub_cmd([sha256, '-import',sample_path]) os.remove(sample_path) log.debug("Sample removed") return ("Analysis completed", 200) @@ -179,48 +193,14 @@ def analyze_sample(): raise BadRequest("Sample analysis failed") -@app.route("/ghidra/api/get_functions_list_detailed/") +@app.route("/ghidra/api/get_functions_list_detailed/", methods=["GET"]) def get_functions_list_detailed(sha256): """ Given the sha256 of a sample, returns the list of functions. - If the sample has not been analyzed, returns an error. + If the sample has not been analyzed, returns an error."FunctionsListA.py", """ try: - project_path = os.path.join(GHIDRA_PROJECT, sha256 + ".gpr") - # Check if the sample has been analyzed - if os.path.isfile(project_path): - output_path = os.path.join( - GHIDRA_OUTPUT, sha256 + "functions_list_a.json") - - command = [GHIDRA_HEADLESS, - GHIDRA_PROJECT, - sha256, - "-process", - sha256, - "-noanalysis", - "-scriptPath", - GHIDRA_SCRIPT, - "-postScript", - "FunctionsListA.py", - output_path, - "-log", - "ghidra_log.txt"] - # Execute Ghidra plugin - log.debug("Ghidra analysis started") - p = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - p.wait() - print(''.join(s.decode("utf-8") for s in list(p.stdout))) - log.debug("Ghidra analysis completed") - - # Check if JSON response is available - if os.path.isfile(output_path): - with open(output_path) as f_in: - return (f_in.read(), 200) - else: - raise BadRequest("FunctionsList plugin failure") - else: - raise BadRequest("Sample has not been analyzed") + return cmd_post_plugin(sha256, "FunctionsListA.py") except BadRequest: raise @@ -233,48 +213,16 @@ def get_functions_list_detailed(sha256): def get_functions_list(sha256): """ Given the sha256 of a sample, returns the list of functions. - If the sample has not been analyzed, returns an error. + If the sample has not been analyzed, returns an error. "FunctionsList.py" """ try: - project_path = os.path.join(GHIDRA_PROJECT, sha256 + ".gpr") - # Check if the sample has been analyzed - if os.path.isfile(project_path): - output_path = os.path.join( - GHIDRA_OUTPUT, sha256 + "functions_list.json") - command = [GHIDRA_HEADLESS, - GHIDRA_PROJECT, - sha256, - "-process", - sha256, - "-noanalysis", - "-scriptPath", - GHIDRA_SCRIPT, - "-postScript", - "FunctionsList.py", - output_path, - "-log", - "ghidra_log.txt"] - # Execute Ghidra plugin - log.debug("Ghidra analysis started") - p = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - p.wait() - print(''.join(s.decode("utf-8") for s in list(p.stdout))) - log.debug("Ghidra analysis completed") - - # Check if JSON response is available - if os.path.isfile(output_path): - with open(output_path) as f_in: - return (f_in.read(), 200) - else: - raise BadRequest("FunctionsList plugin failure") - else: - raise BadRequest("Sample has not been analyzed") + return cmd_post_plugin(sha256, "FunctionsList.py") except BadRequest: raise - except Exception: + except Exception as e: + raise e raise BadRequest("Sample analysis failed") @@ -283,45 +231,10 @@ def get_decompiled_function(sha256, offset): """ Given a sha256, and an offset, returns the decompiled code of the function. Returns an error if the sample has not been analyzed by Ghidra, - or if the offset does not correspond to a function + or if the offset does not correspond to a function "FunctionDecompile.py" """ try: - project_path = os.path.join(GHIDRA_PROJECT, sha256 + ".gpr") - # Check if the sample has been analyzed - if os.path.isfile(project_path): - output_path = os.path.join( - GHIDRA_OUTPUT, sha256 + "function_decompiled.json") - # Call the DecompileFunction Ghidra plugin - command = [GHIDRA_HEADLESS, - GHIDRA_PROJECT, - sha256, - "-process", - sha256, - "-noanalysis", - "-scriptPath", - GHIDRA_SCRIPT, - "-postScript", - "FunctionDecompile.py", - offset, - output_path, - "-log", - "ghidra_log.txt"] - # Execute Ghidra plugin - log.debug("Ghidra analysis started") - p = subprocess.Popen(command, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - p.wait() - print(''.join(s.decode("utf-8") for s in list(p.stdout))) - log.debug("Ghidra analysis completed") - - # Check if the JSON response is available - if os.path.isfile(output_path): - with open(output_path) as f_in: - return (f_in.read(), 200) - else: - raise BadRequest("FunctionDecompile plugin failure") - else: - raise BadRequest("Sample has not been analyzed") + return cmd_post_plugin(sha256, "FunctionDecompile.py", offset) except BadRequest: raise @@ -329,6 +242,26 @@ def get_decompiled_function(sha256, offset): except Exception: raise BadRequest("Sample analysis failed") +@app.route("/ghidra/api/script//") +def get_script_result(sha256, script): + """ + Given a sha256, and an offset, returns the decompiled code of the + function. Returns an error if the sample has not been analyzed by Ghidra, + or if the offset does not correspond to a function "FunctionDecompile.py" + """ + try: + scripts = os.listdir(GHIDRA_SCRIPT) + for s in scripts: + if os.path.splitext(s) == script: + args = request.args.get('args') + return cmd_post_plugin(sha256, s, args) + raise BadRequest("Script %s not found"%script) + except BadRequest: + raise + + except Exception: + raise BadRequest("Sample analysis failed") + @app.route("/ghidra/api/analysis_terminated/") def analysis_terminated(sha256): diff --git a/requirements.txt b/requirements.txt index 4857c7c..068c0bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ coloredlogs==10.0 gunicorn==19.9.0 -Werkzeug==0.15.3 +Werkzeug==0.15.5 Flask==1.0.2 \ No newline at end of file diff --git a/tests/test.py b/tests/test.py index dab3285..1db5925 100644 --- a/tests/test.py +++ b/tests/test.py @@ -45,6 +45,7 @@ def sample_analysis(): def get_function_list(): r = requests.get("%s/get_functions_list/%s" % (URL, SHA256), timeout=300) + print(r.text) print("get_function_list status_code", r.status_code) if r.status_code == 200: return True