Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dotnet: add support for basic blocks #1326

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions capa/features/extractors/dnfile/basicblock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from typing import Tuple, Iterator

from dncil.cil.instruction import Instruction

from capa.features.common import Feature, Characteristic
from capa.features.address import Address
from capa.features.basicblock import BasicBlock
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle


def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract stackstring indicators from basic block"""
raise NotImplementedError


def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract tight loop indicators from a basic block"""
first: Instruction = bbh.inner.instructions[0]
last: Instruction = bbh.inner.instructions[-1]

if any((last.is_br(), last.is_cond_br(), last.is_leave())):
if last.operand == first.offset:
yield Characteristic("tight loop"), bbh.address


def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""extract basic block features"""
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, addr in bb_handler(fh, bbh):
yield feature, addr
yield BasicBlock(), bbh.address


BASIC_BLOCK_HANDLERS = (
extract_bb_tight_loop,
# extract_bb_stackstring,
)
103 changes: 93 additions & 10 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@

from __future__ import annotations

from typing import Dict, List, Tuple, Union, Iterator, Optional
from typing import Set, Dict, List, Tuple, Union, Iterator, Optional

import dnfile
from dncil.cil.opcode import OpCodes
from dncil.cil.instruction import Instruction

import capa.features.extractors
import capa.features.extractors.dotnetfile
import capa.features.extractors.dnfile.file
import capa.features.extractors.dnfile.insn
import capa.features.extractors.dnfile.function
import capa.features.extractors.dnfile.basicblock
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.dnfile.types import DnType, DnBasicBlock, DnUnmanagedMethod
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.dnfile.helpers import (
get_dotnet_types,
Expand Down Expand Up @@ -98,7 +100,13 @@ def get_functions(self) -> Iterator[FunctionHandle]:
fh: FunctionHandle = FunctionHandle(
address=DNTokenAddress(token),
inner=method,
ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache},
ctx={
"pe": self.pe,
"calls_from": set(),
"calls_to": set(),
"blocks": list(),
"cache": self.token_cache,
},
)

# method tokens should be unique
Expand Down Expand Up @@ -127,21 +135,96 @@ def get_functions(self) -> Iterator[FunctionHandle]:
# those calls to other MethodDef methods e.g. calls to imported MemberRef methods
fh.ctx["calls_from"].add(address)

# calculate basic blocks
for fh in methods.values():

# calculate basic block leaders where,
# 1. The first instruction of the intermediate code is a leader
# 2. Instructions that are targets of unconditional or conditional jump/goto statements are leaders
# 3. Instructions that immediately follow unconditional or conditional jump/goto statements are considered leaders
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3: ... immediately follow unconditional ...

can you convince me of this? i think the instruction must be the target of a branch in order to be a leader. otherwise, its unreferenced code.

# https://www.geeksforgeeks.org/basic-blocks-in-compiler-design/

leaders: Set[int] = set()
for (idx, insn) in enumerate(fh.inner.instructions):
if idx == 0:
# add #1
leaders.add(insn.offset)
Comment on lines +148 to +150
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pull this out of the loop so its not re-checked on each iteration


if any((insn.is_br(), insn.is_cond_br(), insn.is_leave())):
# add #2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# add #2
# add #2: targets of branches are leaders

leaders.add(insn.operand)
# add #3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# add #3
# add #3: fallthrough instructions after conditional branches are leaders

try:
leaders.add(fh.inner.instructions[idx + 1].offset)
except IndexError:
# may encounter branch at end of method
continue

# build basic blocks using leaders
bb_curr: Optional[DnBasicBlock] = None
for (idx, insn) in enumerate(fh.inner.instructions):
if insn.offset in leaders:
# new leader, new basic block
bb_curr = DnBasicBlock(instructions=[insn])
fh.ctx["blocks"].append(bb_curr)
continue
assert bb_curr is not None

bb_curr.instructions.append(insn)
mr-tz marked this conversation as resolved.
Show resolved Hide resolved

# create mapping of first instruction to basic block
bb_map: Dict[int, DnBasicBlock] = {}
for bb in fh.ctx["blocks"]:
if len(bb.instructions) == 0:
# TODO: consider error?
continue
Comment on lines +177 to +179
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(bb.instructions) == 0:
# TODO: consider error?
continue
assert len(bb.instructions) > 0

i dont see how this can ever not be the case, given that the bb is initialized with at least one element above. so, we could keep this assertion around, or also remove it.

bb_map[bb.instructions[0].offset] = bb

# connect basic blocks
for (idx, bb) in enumerate(fh.ctx["blocks"]):
if len(bb.instructions) == 0:
# TODO: consider error?
continue
Comment on lines +184 to +186
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(bb.instructions) == 0:
# TODO: consider error?
continue
assert len(bb.instructions) > 0

ditto


last = bb.instructions[-1]

# connect branches to other basic blocks
if any((last.is_br(), last.is_cond_br(), last.is_leave())):
bb_branch: Optional[DnBasicBlock] = bb_map.get(last.operand, None)
if bb_branch is not None:
# TODO: consider None error?
bb.succs.append(bb_branch)
bb_branch.preds.append(bb)

if any((last.is_br(), last.is_leave())):
# no fallthrough
continue

# connect fallthrough
try:
bb_next: DnBasicBlock = fh.ctx["blocks"][idx + 1]
bb.succs.append(bb_next)
Comment on lines +204 to +205
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this always hold just based on the next index?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my thought process (please check my sanity 🙃):

  • We process instructions sequentially resulting in leaders getting processed sequentially
  • Each time we encounter a leader we create a new basic block and add the block to our list resulting in basic blocks getting processed sequentially
  • Therefore, if a basic block has a fallthrough successor we can assume the successor is the next basic block in our list

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense and should go in there as a comment.

What about BBs that do not return (IDA calls them fcb_noret and fcb_enoret)? They occur for example on call abort, RtlFailFast, or potentially on exception handling.

Another corner case from IDA at the call 0x100304ae:
2023-03-01_09-42-28_ida

ID: 8, Start: 0x1003045e, Last instruction: 0x10030465, Size: 12, Type: fcb_normal
  -> ID: 16, Start: 0x100304ae, Last instruction: 0x100304b7, Size: 10, Type: fcb_normal

So we should handle call and jmp at the end of BBs. And maybe exception handling could throw us of?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree the above should be in a comment, just for clarity.

i'm not sure whether or not .NET supports non-returning functions. its possibly that the runtime still requires the ret even if its dead code. we should try to figure this out and definitely document the results here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other assumptions we should probably document (and please verify):

  • .NET doesn't support tail calls, i.e. jmp to other routine
  • .NET doesn't support shared function chunks, i.e. two different entry points to the same function body

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and, right now i see that we're not doing anything with exceptions. i'm not sure how we'd want to represent this in the CFG. there would be a couple things to consider:

  • is there a separate graph for each exception handler? how to refer to them?
  • how to link the try/catch region with the exception handler graph?
  • the end of a try/catch region probably splits a basic block

we may want to figure out how we need to operate on exception handlers before finalizing the decisions here. so maybe this is not a blocker, but something we add once we know we need it.

bb_next.preds.append(bb)
except IndexError:
continue
Comment on lines +203 to +208
Copy link
Collaborator

@williballenthin williballenthin Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
bb_next: DnBasicBlock = fh.ctx["blocks"][idx + 1]
bb.succs.append(bb_next)
bb_next.preds.append(bb)
except IndexError:
continue
try:
bb_next: DnBasicBlock = fh.ctx["blocks"][idx + 1]
except IndexError:
continue
else:
bb.succs.append(bb_next)
bb_next.preds.append(bb)

make region that can throw and handle the exception as small as possible for clarity

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the IndexError a programming error? i suppose we'd hit that case at ret and/or any other instruction at end of function body. can we codify this in any way? not a priority.


yield from methods.values()

def extract_function_features(self, fh) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.dnfile.function.extract_features(fh)

def get_basic_blocks(self, f) -> Iterator[BBHandle]:
# each dotnet method is considered 1 basic block
yield BBHandle(
address=f.address,
inner=f.inner,
)
def get_basic_blocks(self, fh) -> Iterator[BBHandle]:
for bb in fh.ctx["blocks"]:
yield BBHandle(
address=DNTokenOffsetAddress(
fh.address, bb.instructions[0].offset - (fh.inner.offset + fh.inner.header_size)
),
inner=bb,
)

def extract_basic_block_features(self, fh, bbh):
# we don't support basic block features
mike-hunhoff marked this conversation as resolved.
Show resolved Hide resolved
yield from []
yield from capa.features.extractors.dnfile.basicblock.extract_features(fh, bbh)

def get_instructions(self, fh, bbh):
for insn in bbh.inner.instructions:
Expand Down
16 changes: 14 additions & 2 deletions capa/features/extractors/dnfile/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from capa.features.common import Feature, Characteristic
from capa.features.address import Address
from capa.features.extractors import loops
from capa.features.extractors.base_extractor import FunctionHandle

logger = logging.getLogger(__name__)
Expand All @@ -38,7 +39,13 @@ def extract_recursive_call(fh: FunctionHandle) -> Iterator[Tuple[Characteristic,

def extract_function_loop(fh: FunctionHandle) -> Iterator[Tuple[Characteristic, Address]]:
"""extract loop indicators from a function"""
raise NotImplementedError()
edges = []
for bb in fh.ctx["blocks"]:
for succ in bb.succs:
edges.append((bb.instructions[0].offset, succ.instructions[0].offset))

if loops.has_loop(edges):
yield Characteristic("loop"), fh.address


def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
Expand All @@ -47,4 +54,9 @@ def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
yield feature, addr


FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_calls_from, extract_recursive_call)
FUNCTION_HANDLERS = (
extract_function_calls_to,
extract_function_calls_from,
extract_recursive_call,
extract_function_loop,
)
13 changes: 11 additions & 2 deletions capa/features/extractors/dnfile/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from enum import Enum
from typing import Union, Optional
from typing import TYPE_CHECKING, Dict, List, Optional

if TYPE_CHECKING:
from dncil.cil.instruction import Instruction


class DnType(object):
Expand Down Expand Up @@ -73,3 +75,10 @@ def __repr__(self):
@staticmethod
def format_name(module, method):
return f"{module}.{method}"


class DnBasicBlock:
def __init__(self, preds=None, succs=None, instructions=None):
self.succs: List[DnBasicBlock] = succs or []
self.preds: List[DnBasicBlock] = preds or []
self.instructions: List[Instruction] = instructions or []
19 changes: 11 additions & 8 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def get_function_by_token(extractor, token: int) -> FunctionHandle:
def get_basic_block(extractor, fh: FunctionHandle, va: int) -> BBHandle:
for bbh in extractor.get_basic_blocks(fh):
if isinstance(extractor, DnfileFeatureExtractor):
addr = bbh.inner.offset
addr = bbh.inner.instructions[0].offset
else:
addr = bbh.address
if addr == va:
Expand Down Expand Up @@ -741,9 +741,9 @@ def parametrize(params, values, **kwargs):
("hello-world", "file", capa.features.common.Class("System.Console"), True),
("hello-world", "file", capa.features.common.Namespace("System.Diagnostics"), True),
("hello-world", "function=0x250", capa.features.common.String("Hello World!"), True),
("hello-world", "function=0x250, bb=0x250, insn=0x252", capa.features.common.String("Hello World!"), True),
("hello-world", "function=0x250, bb=0x250, insn=0x257", capa.features.common.Class("System.Console"), True),
("hello-world", "function=0x250, bb=0x250, insn=0x257", capa.features.common.Namespace("System"), True),
("hello-world", "function=0x250, bb=0x251, insn=0x252", capa.features.common.String("Hello World!"), True),
("hello-world", "function=0x250, bb=0x251, insn=0x257", capa.features.common.Class("System.Console"), True),
("hello-world", "function=0x250, bb=0x251, insn=0x257", capa.features.common.Namespace("System"), True),
("hello-world", "function=0x250", capa.features.insn.API("System.Console::WriteLine"), True),
("hello-world", "file", capa.features.file.Import("System.Console::WriteLine"), True),
("_1c444", "file", capa.features.common.String(r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"), True),
Expand All @@ -758,6 +758,7 @@ def parametrize(params, values, **kwargs):
("_1c444", "token=0x6000018", capa.features.common.Characteristic("calls to"), False),
("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls from"), True),
("_1c444", "token=0x600000F", capa.features.common.Characteristic("calls from"), False),
("_1c444", "token=0x600001D", capa.features.common.Characteristic("loop"), True),
("_1c444", "function=0x1F68", capa.features.insn.Number(0x0), True),
("_1c444", "function=0x1F68", capa.features.insn.Number(0x1), False),
("_692f", "token=0x6000004", capa.features.insn.API("System.Linq.Enumerable::First"), True), # generic method
Expand All @@ -773,7 +774,7 @@ def parametrize(params, values, **kwargs):
("_1c444", "token=0x6000020", capa.features.common.Class("Reqss.Reqss"), True), # ldftn
(
"_1c444",
"function=0x1F59, bb=0x1F59, insn=0x1F5B",
"function=0x1F59, bb=0x1F5A, insn=0x1F5B",
capa.features.common.Characteristic("unmanaged call"),
True,
),
Expand All @@ -782,11 +783,11 @@ def parametrize(params, values, **kwargs):
("_1c444", "token=0x6000088", capa.features.common.Characteristic("unmanaged call"), False),
(
"_1c444",
"function=0x1F68, bb=0x1F68, insn=0x1FF9",
"function=0x1F68, bb=0x1F74, insn=0x1FF9",
capa.features.insn.API("System.Drawing.Image::FromHbitmap"),
True,
),
("_1c444", "function=0x1F68, bb=0x1F68, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False),
("_1c444", "function=0x1F68, bb=0x1F74, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False),
(
"_1c444",
"token=0x600002B",
Expand Down Expand Up @@ -954,6 +955,7 @@ def parametrize(params, values, **kwargs):
("mimikatz", "file", capa.features.file.Import("cabinet.FCIAddFile"), True),
]


FEATURE_COUNT_TESTS = [
("mimikatz", "function=0x40E5C2", capa.features.basicblock.BasicBlock(), 7),
("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0),
Expand All @@ -962,8 +964,9 @@ def parametrize(params, values, **kwargs):
("mimikatz", "function=0x40B1F1", capa.features.common.Characteristic("calls to"), 3),
]


FEATURE_COUNT_TESTS_DOTNET = [
("_1c444", "token=0x06000072", capa.features.basicblock.BasicBlock(), 1),
("_1c444", "token=0x0600008C", capa.features.basicblock.BasicBlock(), 10),
("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls to"), 1),
("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls from"), 9),
]
Expand Down