Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigation framework and completed playbooks integration. Closes #173 #260

Merged
merged 17 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyintelowl/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .analyse import analyse
from .commands import analyzer_healthcheck, connector_healthcheck
from .config import config
from .investigations import investigations
from .jobs import jobs
from .tags import tags

Expand All @@ -9,9 +10,9 @@
config,
jobs,
tags,
investigations,
]


cmds = [
analyzer_healthcheck,
connector_healthcheck,
Expand Down
1 change: 1 addition & 0 deletions pyintelowl/cli/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def get_status_text(status: str, as_text=True):
"pending": ("#CE5C00", str(Emoji("gear"))),
"running": ("#CE5C00", str(Emoji("gear"))),
"reported_without_fails": ("#73D216", str(Emoji("heavy_check_mark"))),
"concluded": ("#73D216", str(Emoji("heavy_check_mark"))),
"reported_with_fails": ("#CC0000", str(Emoji("warning"))),
"failed": ("#CC0000", str(Emoji("cross_mark"))),
"killed": ("#CC0000", str(Emoji("cross_mark"))),
Expand Down
253 changes: 253 additions & 0 deletions pyintelowl/cli/investigations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import json

import click
from rich import box
from rich import print as rprint
from rich.console import Console, Group
from rich.panel import Panel
from rich.table import Table

from pyintelowl import IntelOwlClientException
from pyintelowl.cli._utils import (
ClickContext,
add_options,
get_json_syntax,
get_status_text,
json_flag_option,
)


@click.group(help="Manage investigations")
def investigations():
pass


def _display_all_investigations(logger, rows):
console = Console()
table = Table(show_header=True, title="List of Investigations", box=box.DOUBLE_EDGE)
header_style = "bold blue"
table.add_column(header="Id", header_style=header_style)
table.add_column(header="Name", header_style=header_style)
table.add_column(header="Tags", header_style=header_style)
table.add_column(header="Description", header_style=header_style)
table.add_column(header="Owner", header_style=header_style)
table.add_column(header="TLP", header_style=header_style)
table.add_column(header="Total jobs", header_style=header_style)
table.add_column(header="Jobs", header_style=header_style)
table.add_column(header="Status", header_style=header_style)
try:
for el in rows:
table.add_row(
str(el["id"]),
el["name"],
", ".join([str(tag) for tag in el["tags"]]),
el["description"],
el["owner"],
el["tlp"],
str(el["total_jobs"]),
", ".join([str(job_id) for job_id in el["jobs"]]),
el["status"],
)
console.print(table, justify="center")
except Exception as e:
logger.fatal(e, exc_info=True)


@investigations.command(help="Delete job from investigation by their ID")
@click.argument("investigation_id", type=int)
@click.argument("job_id", type=int)
@click.pass_context
def rm(ctx: ClickContext, investigation_id: int, job_id: int):
ctx.obj.logger.info(
f"Requesting delete for Job [underline blue]#{job_id}[/] "
f"from Investigation #[underline blue]#{investigation_id}[/].."
)
try:
ctx.obj.delete_job_from_investigation(investigation_id, job_id)
except IntelOwlClientException as e:
ctx.obj.logger.fatal(str(e))


@investigations.command(
help="Add existing job to an existing investigation by their ID"
)
@click.argument("investigation_id", type=int)
@click.argument("job_id", type=int)
@click.pass_context
def add(ctx: ClickContext, investigation_id: int, job_id: int):
ctx.obj.logger.info(
f"Requesting add for Job [underline blue]#{job_id}[/] "
f"to Investigation #[underline blue]#{investigation_id}[/].."
)
try:
ctx.obj.add_job_to_investigation(investigation_id, job_id)
except IntelOwlClientException as e:
ctx.obj.logger.fatal(str(e))


def _render_investigation_attributes(data):
style = "[bold #31DDCF]"
tags = ", ".join(
data["tags"]
) # this is a [str], not a complex object like in job API
status: str = get_status_text(data["status"], as_text=False)
console = Console()
console.print(data)
r = Group(
f"{style}Investigation ID:[/] {str(data['id'])}",
f"{style}Name:[/] {data['name']}",
f"{style}Tags:[/] {tags}",
f"{style}Status:[/] {status}",
f"{style}TLP:[/] {data['tlp']}",
f"{style}Total jobs:[/] {data['total_jobs']}",
f"{style}Jobs ID:[/] {data['jobs']}",
f"{style}Description:[/] {data['description']}",
)
return Panel(r, title="Investigation attributes")


def _render_investigation_table(data, title: str):
headers = ["Name", "Owner", "Jobs"]
table = Table(
show_header=True,
title=title,
box=box.DOUBLE_EDGE,
show_lines=True,
)
# add headers
for h in headers:
table.add_column(h, header_style="bold blue")

# retrieve all jobs and childrens
table.add_row(
data.get("name", ""),
str(data.get("owner", "")),
get_json_syntax(data.get("jobs", [])),
)
return table


def _display_investigation(data):
console = Console()
attrs = _render_investigation_attributes(data)
with console.pager(styles=True):
console.print(attrs)


def _display_investigation_tree(data):
console = Console()
table = _render_investigation_table(data, title="Investigation report")
with console.pager(styles=True):
console.print(table, justify="center")


@investigations.command(
help="Tabular print investigation attributes and results for an investigation ID"
)
@click.argument("investigation_id", type=int)
@click.option(
"-r",
"--report",
type=click.Choice(
[
"analyzers",
"connectors",
]
),
default="analyzers",
show_choices=True,
help="""
Choose the type of report to be displayed:
analyzer or connector.
""",
)
@add_options(json_flag_option)
@click.pass_context
def view(
ctx: ClickContext,
investigation_id: int,
report: str,
as_json: bool,
):
ctx.obj.logger.info(
f"Requesting Investigation [underline blue]#{investigation_id}[/].."
)
try:
ans = ctx.obj.get_investigation_by_id(investigation_id)
except IntelOwlClientException as e:
ctx.obj.logger.fatal(str(e))

if as_json:
rprint(json.dumps(ans, indent=4))
else:
_display_investigation(ans)


@investigations.command(
help="Tabular print investigation's tree structure for an investigation ID"
)
@click.argument("investigation_id", type=int)
@click.option(
"-r",
"--report",
type=click.Choice(
[
"analyzers",
"connectors",
]
),
default="analyzers",
show_choices=True,
help="""
Choose the type of report to be displayed:
analyzer or connector.
""",
)
@add_options(json_flag_option)
@click.pass_context
def view_tree(
ctx: ClickContext,
investigation_id: int,
report: str,
as_json: bool,
):
ctx.obj.logger.info(
f"Requesting Investigation tree [underline blue]#{investigation_id}[/].."
)
try:
ans = ctx.obj.get_investigation_tree_by_id(investigation_id)
except IntelOwlClientException as e:
ctx.obj.logger.fatal(str(e))

if as_json:
rprint(json.dumps(ans, indent=4))
else:
_display_investigation_tree(ans)


@investigations.command(help="List all investigations")
@click.option(
"--status",
type=click.Choice(
["created", "running", "concluded"],
case_sensitive=False,
),
show_choices=True,
help="Only show investigations having a particular status",
)
@add_options(json_flag_option)
@click.pass_context
def ls(ctx: ClickContext, status: str, as_json: bool):
ctx.obj.logger.info("Requesting list of investigations..")
try:
ans = ctx.obj.get_all_investigations()
results = ans.get("results", [])
ctx.obj.logger.info(results)
if status:
ans = [el for el in results if el["status"].lower() == status.lower()]
if as_json:
rprint(json.dumps(ans, indent=4))
else:
_display_all_investigations(ctx.obj.logger, results)
except IntelOwlClientException as e:
ctx.obj.logger.fatal(str(e))
96 changes: 96 additions & 0 deletions pyintelowl/pyintelowl.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,102 @@ def get_job_by_id(self, job_id: Union[int, str]) -> Dict[str, Any]:
response = self.__make_request("GET", url=url)
return response.json()

def add_job_to_investigation(
self, investigation_id: Union[int, str], job_id: Union[int, str]
):
"""Add an existing job to an existing investigation.
Endpoint: ``/api/investigation/{job_id}/add_job``

Args:
job_id (Union[int, str]): Job ID
investigation_id (Union[int, str]): Investigation ID

Raises:
IntelOwlClientException: on client/HTTP error

Returns:
Dict[str, Any]: JSON body.
"""
url: str = self.instance + f"/api/investigation/{str(investigation_id)}/add_job"
data: dict = {"job": job_id}
response = self.__make_request("POST", url=url, data=data)
return response.json()

def delete_job_from_investigation(
self, investigation_id: Union[int, str], job_id: Union[int, str]
):
"""Delete a job from an existing investigation.
Endpoint: ``/api/investigation/{job_id}/remove_job``

Args:
job_id (Union[int, str]): Job ID
investigation_id (Union[int, str]): Investigation ID

Raises:
IntelOwlClientException: on client/HTTP error

Returns:
Dict[str, Any]: JSON body.
"""
url: str = (
self.instance + f"/api/investigation/{str(investigation_id)}/remove_job"
)
data: dict = {"job": job_id}
response = self.__make_request("POST", url=url, data=data)
return response.json()

def get_all_investigations(self) -> Dict[str, Any]:
"""Fetch all investigations info.
Endpoint: ``/api/investigation/``

Raises:
IntelOwlClientException: on client/HTTP error

Returns:
Dict[str, Any]: JSON body.
"""
url = self.instance + "/api/investigation"
response = self.__make_request("GET", url=url)
return response.json()

def get_investigation_by_id(
self, investigation_id: Union[int, str]
) -> Dict[str, Any]:
"""Fetch investigation info by ID.
Endpoint: ``/api/investigation/{job_id}``

Args:
investigation_id (Union[int, str]): Investigation ID to retrieve

Raises:
IntelOwlClientException: on client/HTTP error

Returns:
Dict[str, Any]: JSON body.
"""
url = self.instance + "/api/investigation/" + str(investigation_id)
response = self.__make_request("GET", url=url)
return response.json()

def get_investigation_tree_by_id(
self, investigation_id: Union[int, str]
) -> Dict[str, Any]:
"""Fetch investigation tree info by ID.
Endpoint: ``/api/investigation/{job_id}/tree``

Args:
investigation_id (Union[int, str]): Investigation ID to retrieve

Raises:
IntelOwlClientException: on client/HTTP error

Returns:
Dict[str, Any]: JSON body.
"""
url = self.instance + "/api/investigation/" + str(investigation_id) + "/tree"
response = self.__make_request("GET", url=url)
return response.json()

@staticmethod
def get_md5(
to_hash: AnyStr,
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
MOCK_CONNECTIONS = True
TEST_JOB_ID = 1
TEST_INVESTIGATION_ID = 1
TEST_IP = "8.8.8.8"
TEST_DOMAIN = "www.google.com"
TEST_URL = "https://www.google.com/search?test"
Expand Down
Loading
Loading