diff --git a/CHANGELOG.md b/CHANGELOG.md index ffb42ff6e9..a291fc8226 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master (unreleased) ### New Features +- dotnet: add support for basic blocks #1326 @mike-hunhoff ### Breaking Changes diff --git a/capa/features/extractors/dnfile/basicblock.py b/capa/features/extractors/dnfile/basicblock.py new file mode 100644 index 0000000000..2e48a6b693 --- /dev/null +++ b/capa/features/extractors/dnfile/basicblock.py @@ -0,0 +1,45 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +from typing import Tuple, Iterator + +from dncil.cil.instruction import Instruction + +from capa.features.common import Feature, Characteristic +from capa.features.address import Address +from capa.features.basicblock import BasicBlock +from capa.features.extractors.base_extractor import BBHandle, FunctionHandle + + +def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: + """extract stackstring indicators from basic block""" + raise NotImplementedError + + +def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: + """extract tight loop indicators from a basic block""" + first: Instruction = bbh.inner.instructions[0] + last: Instruction = bbh.inner.instructions[-1] + + if any((last.is_br(), last.is_cond_br(), last.is_leave())): + if last.operand == first.offset: + yield Characteristic("tight loop"), bbh.address + + +def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: + """extract basic block features""" + for bb_handler in BASIC_BLOCK_HANDLERS: + for feature, addr in bb_handler(fh, bbh): + yield feature, addr + yield BasicBlock(), bbh.address + + +BASIC_BLOCK_HANDLERS = ( + extract_bb_tight_loop, + # extract_bb_stackstring, +) diff --git a/capa/features/extractors/dnfile/extractor.py b/capa/features/extractors/dnfile/extractor.py index ad180257dd..6dcad26821 100644 --- a/capa/features/extractors/dnfile/extractor.py +++ b/capa/features/extractors/dnfile/extractor.py @@ -8,19 +8,21 @@ from __future__ import annotations -from typing import Dict, List, Tuple, Union, Iterator, Optional +from typing import Set, Dict, List, Tuple, Union, Iterator, Optional import dnfile from dncil.cil.opcode import OpCodes +from dncil.cil.instruction import Instruction import capa.features.extractors import capa.features.extractors.dotnetfile import capa.features.extractors.dnfile.file import capa.features.extractors.dnfile.insn import capa.features.extractors.dnfile.function +import capa.features.extractors.dnfile.basicblock from capa.features.common import Feature from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress -from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod +from capa.features.extractors.dnfile.types import DnType, DnBasicBlock, DnUnmanagedMethod from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor from capa.features.extractors.dnfile.helpers import ( get_dotnet_types, @@ -98,7 +100,13 @@ def get_functions(self) -> Iterator[FunctionHandle]: fh: FunctionHandle = FunctionHandle( address=DNTokenAddress(token), inner=method, - ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache}, + ctx={ + "pe": self.pe, + "calls_from": set(), + "calls_to": set(), + "blocks": list(), + "cache": self.token_cache, + }, ) # method tokens should be unique @@ -127,26 +135,99 @@ def get_functions(self) -> Iterator[FunctionHandle]: # those calls to other MethodDef methods e.g. calls to imported MemberRef methods fh.ctx["calls_from"].add(address) + # calculate basic blocks + for fh in methods.values(): + # calculate basic block leaders where, + # 1. The first instruction of the intermediate code is a leader + # 2. Instructions that are targets of unconditional or conditional jump/goto statements are leaders + # 3. Instructions that immediately follow unconditional or conditional jump/goto statements are considered leaders + # https://www.geeksforgeeks.org/basic-blocks-in-compiler-design/ + + leaders: Set[int] = set() + for idx, insn in enumerate(fh.inner.instructions): + if idx == 0: + # add #1 + leaders.add(insn.offset) + + if any((insn.is_br(), insn.is_cond_br(), insn.is_leave())): + # add #2 + leaders.add(insn.operand) + # add #3 + try: + leaders.add(fh.inner.instructions[idx + 1].offset) + except IndexError: + # may encounter branch at end of method + continue + + # build basic blocks using leaders + bb_curr: Optional[DnBasicBlock] = None + for idx, insn in enumerate(fh.inner.instructions): + if insn.offset in leaders: + # new leader, new basic block + bb_curr = DnBasicBlock(instructions=[insn]) + fh.ctx["blocks"].append(bb_curr) + continue + + assert bb_curr is not None + bb_curr.instructions.append(insn) + + # create mapping of first instruction to basic block + bb_map: Dict[int, DnBasicBlock] = {} + for bb in fh.ctx["blocks"]: + if len(bb.instructions) == 0: + # TODO: consider error? + continue + bb_map[bb.instructions[0].offset] = bb + + # connect basic blocks + for idx, bb in enumerate(fh.ctx["blocks"]): + if len(bb.instructions) == 0: + # TODO: consider error? + continue + + last = bb.instructions[-1] + + # connect branches to other basic blocks + if any((last.is_br(), last.is_cond_br(), last.is_leave())): + bb_branch: Optional[DnBasicBlock] = bb_map.get(last.operand, None) + if bb_branch is not None: + # TODO: consider None error? + bb.succs.append(bb_branch) + bb_branch.preds.append(bb) + + if any((last.is_br(), last.is_leave())): + # no fallthrough + continue + + # connect fallthrough + try: + bb_next: DnBasicBlock = fh.ctx["blocks"][idx + 1] + bb.succs.append(bb_next) + bb_next.preds.append(bb) + except IndexError: + continue + yield from methods.values() def extract_function_features(self, fh) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.dnfile.function.extract_features(fh) - def get_basic_blocks(self, f) -> Iterator[BBHandle]: - # each dotnet method is considered 1 basic block - yield BBHandle( - address=f.address, - inner=f.inner, - ) + def get_basic_blocks(self, fh) -> Iterator[BBHandle]: + for bb in fh.ctx["blocks"]: + yield BBHandle( + address=DNTokenOffsetAddress( + fh.address, bb.instructions[0].offset - (fh.inner.offset + fh.inner.header_size) + ), + inner=bb, + ) def extract_basic_block_features(self, fh, bbh): - # we don't support basic block features - yield from [] + yield from capa.features.extractors.dnfile.basicblock.extract_features(fh, bbh) def get_instructions(self, fh, bbh): for insn in bbh.inner.instructions: yield InsnHandle( - address=DNTokenOffsetAddress(bbh.address, insn.offset - (fh.inner.offset + fh.inner.header_size)), + address=DNTokenOffsetAddress(fh.address, insn.offset - (fh.inner.offset + fh.inner.header_size)), inner=insn, ) diff --git a/capa/features/extractors/dnfile/function.py b/capa/features/extractors/dnfile/function.py index 5ddde9a879..5c998d9d2f 100644 --- a/capa/features/extractors/dnfile/function.py +++ b/capa/features/extractors/dnfile/function.py @@ -13,6 +13,7 @@ from capa.features.common import Feature, Characteristic from capa.features.address import Address +from capa.features.extractors import loops from capa.features.extractors.base_extractor import FunctionHandle logger = logging.getLogger(__name__) @@ -38,7 +39,13 @@ def extract_recursive_call(fh: FunctionHandle) -> Iterator[Tuple[Characteristic, def extract_function_loop(fh: FunctionHandle) -> Iterator[Tuple[Characteristic, Address]]: """extract loop indicators from a function""" - raise NotImplementedError() + edges = [] + for bb in fh.ctx["blocks"]: + for succ in bb.succs: + edges.append((bb.instructions[0].offset, succ.instructions[0].offset)) + + if loops.has_loop(edges): + yield Characteristic("loop"), fh.address def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: @@ -47,4 +54,9 @@ def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: yield feature, addr -FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_calls_from, extract_recursive_call) +FUNCTION_HANDLERS = ( + extract_function_calls_to, + extract_function_calls_from, + extract_recursive_call, + extract_function_loop, +) diff --git a/capa/features/extractors/dnfile/types.py b/capa/features/extractors/dnfile/types.py index 822b5d6705..1cafa35ec0 100644 --- a/capa/features/extractors/dnfile/types.py +++ b/capa/features/extractors/dnfile/types.py @@ -6,8 +6,10 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from enum import Enum -from typing import Union, Optional +from typing import TYPE_CHECKING, Dict, List, Optional + +if TYPE_CHECKING: + from dncil.cil.instruction import Instruction class DnType(object): @@ -73,3 +75,10 @@ def __repr__(self): @staticmethod def format_name(module, method): return f"{module}.{method}" + + +class DnBasicBlock: + def __init__(self, preds=None, succs=None, instructions=None): + self.succs: List[DnBasicBlock] = succs or [] + self.preds: List[DnBasicBlock] = preds or [] + self.instructions: List[Instruction] = instructions or [] diff --git a/tests/fixtures.py b/tests/fixtures.py index ae336f5623..2369bb0dea 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -371,7 +371,7 @@ def get_function_by_token(extractor, token: int) -> FunctionHandle: def get_basic_block(extractor, fh: FunctionHandle, va: int) -> BBHandle: for bbh in extractor.get_basic_blocks(fh): if isinstance(extractor, DnfileFeatureExtractor): - addr = bbh.inner.offset + addr = bbh.inner.instructions[0].offset else: addr = bbh.address if addr == va: @@ -741,9 +741,9 @@ def parametrize(params, values, **kwargs): ("hello-world", "file", capa.features.common.Class("System.Console"), True), ("hello-world", "file", capa.features.common.Namespace("System.Diagnostics"), True), ("hello-world", "function=0x250", capa.features.common.String("Hello World!"), True), - ("hello-world", "function=0x250, bb=0x250, insn=0x252", capa.features.common.String("Hello World!"), True), - ("hello-world", "function=0x250, bb=0x250, insn=0x257", capa.features.common.Class("System.Console"), True), - ("hello-world", "function=0x250, bb=0x250, insn=0x257", capa.features.common.Namespace("System"), True), + ("hello-world", "function=0x250, bb=0x251, insn=0x252", capa.features.common.String("Hello World!"), True), + ("hello-world", "function=0x250, bb=0x251, insn=0x257", capa.features.common.Class("System.Console"), True), + ("hello-world", "function=0x250, bb=0x251, insn=0x257", capa.features.common.Namespace("System"), True), ("hello-world", "function=0x250", capa.features.insn.API("System.Console::WriteLine"), True), ("hello-world", "file", capa.features.file.Import("System.Console::WriteLine"), True), ("_1c444", "file", capa.features.common.String(r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"), True), @@ -758,6 +758,8 @@ def parametrize(params, values, **kwargs): ("_1c444", "token=0x6000018", capa.features.common.Characteristic("calls to"), False), ("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls from"), True), ("_1c444", "token=0x600000F", capa.features.common.Characteristic("calls from"), False), + ("_1c444", "token=0x600001D", capa.features.common.Characteristic("loop"), True), + ("_1c444", "token=0x0600008C", capa.features.common.Characteristic("loop"), False), ("_1c444", "function=0x1F68", capa.features.insn.Number(0x0), True), ("_1c444", "function=0x1F68", capa.features.insn.Number(0x1), False), ("_692f", "token=0x6000004", capa.features.insn.API("System.Linq.Enumerable::First"), True), # generic method @@ -773,7 +775,7 @@ def parametrize(params, values, **kwargs): ("_1c444", "token=0x6000020", capa.features.common.Class("Reqss.Reqss"), True), # ldftn ( "_1c444", - "function=0x1F59, bb=0x1F59, insn=0x1F5B", + "function=0x1F59, bb=0x1F5A, insn=0x1F5B", capa.features.common.Characteristic("unmanaged call"), True, ), @@ -782,11 +784,11 @@ def parametrize(params, values, **kwargs): ("_1c444", "token=0x6000088", capa.features.common.Characteristic("unmanaged call"), False), ( "_1c444", - "function=0x1F68, bb=0x1F68, insn=0x1FF9", + "function=0x1F68, bb=0x1F74, insn=0x1FF9", capa.features.insn.API("System.Drawing.Image::FromHbitmap"), True, ), - ("_1c444", "function=0x1F68, bb=0x1F68, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False), + ("_1c444", "function=0x1F68, bb=0x1F74, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False), ( "_1c444", "token=0x600002B", @@ -954,6 +956,7 @@ def parametrize(params, values, **kwargs): ("mimikatz", "file", capa.features.file.Import("cabinet.FCIAddFile"), True), ] + FEATURE_COUNT_TESTS = [ ("mimikatz", "function=0x40E5C2", capa.features.basicblock.BasicBlock(), 7), ("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0), @@ -962,8 +965,9 @@ def parametrize(params, values, **kwargs): ("mimikatz", "function=0x40B1F1", capa.features.common.Characteristic("calls to"), 3), ] - FEATURE_COUNT_TESTS_DOTNET = [ + ("_1c444", "token=0x06000072", capa.features.basicblock.BasicBlock(), 1), + ("_1c444", "token=0x0600008C", capa.features.basicblock.BasicBlock(), 10), ("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls to"), 1), ("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls from"), 9), ]