Skip to content
24 changes: 24 additions & 0 deletions benchmarks/appsec_iast_aspects/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,30 @@ join_noaspect:
<<: *join_aspect
function_name: "join_noaspect"

strip_aspect: &strip_aspect
warmups: 1
function_name: "iast_strip_aspect"

strip_noaspect:
<<: *strip_aspect
function_name: "strip_noaspect"

rstrip_aspect: &rstrip_aspect
warmups: 1
function_name: "iast_rstrip_aspect"

rstrip_noaspect:
<<: *rstrip_aspect
function_name: "rstrip_noaspect"

lstrip_aspect: &lstrip_aspect
warmups: 1
function_name: "iast_lstrip_aspect"

lstrip_noaspect:
<<: *lstrip_aspect
function_name: "lstrip_noaspect"

lower_aspect: &lower_aspect
warmups: 1
function_name: "iast_lower_aspect"
Expand Down
31 changes: 27 additions & 4 deletions benchmarks/appsec_iast_aspects/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@
"modulo_aspect",
"replace_aspect",
"repr_aspect",
"rsplit_aspect",
"slice_aspect",
"split_aspect",
"split_aspect",
"splitlines_aspect",
"str_aspect",
"stringio_aspect",
"swapcase_aspect",
"title_aspect",
"translate_aspect",
"upper_aspect",
"rstrip_aspect",
"lstrip_aspect",
"strip_aspect",
]

notfound_symbols = []
Expand Down Expand Up @@ -454,3 +453,27 @@ def iast_split_aspect():

def split_noaspect():
return "foo bar baz".split()


def iast_strip_aspect():
return strip_aspect(None, 1, " foo bar baz ") # noqa: F821


def strip_noaspect():
return " foo bar baz ".strip()


def iast_rstrip_aspect():
return rstrip_aspect(None, 1, " foo bar baz ") # noqa: F821


def rstrip_noaspect():
return " foo bar baz ".rstrip()


def iast_lstrip_aspect():
return lstrip_aspect(None, 1, " foo bar baz ") # noqa: F821


def lstrip_noaspect():
return " foo bar baz ".lstrip()
3 changes: 3 additions & 0 deletions ddtrace/appsec/_iast/_ast/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def _mark_avoid_convert_recursively(node):
"split": _PREFIX + "aspects.split_aspect", # Both regular split and re.split
"rsplit": _PREFIX + "aspects.rsplit_aspect",
"splitlines": _PREFIX + "aspects.splitlines_aspect",
"lstrip": _PREFIX + "aspects.lstrip_aspect",
"rstrip": _PREFIX + "aspects.rstrip_aspect",
"strip": _PREFIX + "aspects.strip_aspect",
# re module and re.Match methods
"findall": _PREFIX + "aspects.re_findall_aspect",
"finditer": _PREFIX + "aspects.re_finditer_aspect",
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/appsec/_iast/_patch_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def patch_iast(patch_modules=IAST_PATCH):
when_imported("werkzeug.utils")(
lambda _: try_wrap_function_wrapper("werkzeug.utils", "secure_filename", path_traversal_sanitizer)
)
# TODO: werkzeug.utils.safe_join propagation doesn't work because strip("._") which is not yet supported by IAST
# TODO: werkzeug.utils.safe_join propagation doesn't work because normpath which is not yet supported by IAST
# when_imported("werkzeug.utils")(
# lambda _: try_wrap_function_wrapper(
# "werkzeug.utils",
Expand Down
122 changes: 122 additions & 0 deletions ddtrace/appsec/_iast/_taint_tracking/aspects.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
"slice_aspect",
"split_aspect",
"splitlines_aspect",
"lstrip_aspect",
"rstrip_aspect",
"strip_aspect",
"str_aspect",
"stringio_aspect",
"swapcase_aspect",
Expand Down Expand Up @@ -1312,3 +1315,122 @@ def ospathsplitroot_aspect(*args: Any, **kwargs: Any) -> Any:
iast_propagation_error_log(f"_aspect_ospathsplitroot. {e}")

return os.path.splitroot(*args, **kwargs) # type: ignore[attr-defined]


def lstrip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES:
if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType):
if flag_added_args > 0:
args = args[flag_added_args:]
return orig_function(*args, **kwargs)

candidate_text = args[0]
args = args[flag_added_args:]

result = candidate_text.lstrip(*args, **kwargs)

if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
_strip_lstrip_aspect(candidate_text, result)
return result
except Exception as e:
iast_propagation_error_log(f"lstrip_aspect. {e}")

return result


def rstrip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES:
if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType):
if flag_added_args > 0:
args = args[flag_added_args:]
return orig_function(*args, **kwargs)

candidate_text = args[0]
args = args[flag_added_args:]

result = candidate_text.rstrip(*args, **kwargs)

if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
ranges_new: List[TaintRange] = []
ranges_new_append = ranges_new.append

ranges = get_ranges(candidate_text)
len_result = len(result)
if len_result == len(candidate_text):
taint_pyobject_with_ranges(result, tuple(ranges))
else:
for taint_range in ranges:
if taint_range.start >= len_result:
continue

new_length = min(len_result - taint_range.start, taint_range.length)
new_range = TaintRange(
start=taint_range.start,
length=new_length,
source=taint_range.source,
secure_marks=taint_range.secure_marks,
)
ranges_new_append(new_range)
taint_pyobject_with_ranges(result, tuple(ranges_new))
return result
except Exception as e:
iast_propagation_error_log(f"rstrip_aspect. {e}")

return result


def strip_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> TEXT_TYPES:
if orig_function is not None and not isinstance(orig_function, BuiltinFunctionType):
if flag_added_args > 0:
args = args[flag_added_args:]
return orig_function(*args, **kwargs)

candidate_text = args[0]
args = args[flag_added_args:]
result = candidate_text.strip(*args, **kwargs)

if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
_strip_lstrip_aspect(candidate_text, result)
return result
except Exception as e:
iast_propagation_error_log(f"strip_aspect. {e}")

return result


def _strip_lstrip_aspect(candidate_text, result):
ranges_new: List[TaintRange] = []
ranges = get_ranges(candidate_text)
start_pos = candidate_text.index(result)
len_result = len(result)
end_pos = start_pos + len_result
if len_result != len(candidate_text):
for taint_range in ranges:
range_start = taint_range.start
range_end = range_start + taint_range.length

if range_end <= start_pos or range_start >= end_pos:
continue

# Calculate new range boundaries
new_start = max(range_start - start_pos, 0)
new_end = min(range_end - start_pos, len_result)
new_length = new_end - new_start

if new_length > 0:
# Create a new range with adjusted position and length
new_range = TaintRange(
start=new_start,
length=new_length,
source=taint_range.source,
secure_marks=taint_range.secure_marks,
)
ranges_new.append(new_range)
taint_pyobject_with_ranges(result, tuple(ranges_new))
10 changes: 6 additions & 4 deletions scripts/iast/mod_leak_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,16 @@ async def test_doit():
string12 = string11 + "_notainted"
string13 = string12.rsplit("_", 1)[0]
string13_2 = string13 + " " + string13
string13_3 = string13_2.strip()
string13_4 = string13_3.rstrip()
string13_5 = string13_4.lstrip()
try:
string13_3, string13_5, string13_5 = string13_2.split(" ")
string13_5_1, string13_5_2, string13_5_3 = string13_5.split(" ")
except ValueError:
pass
sink_points(string13_2)

sink_points(string13_5)
# os path propagation
string14 = os.path.join(string13_2, "a")
string14 = os.path.join(string13_5, "a")
string15 = os.path.split(string14)[0]
string16 = os.path.dirname(string15 + "/" + "foobar")
string17 = os.path.basename("/foobar/" + string16)
Expand Down
Loading
Loading