Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions aikido_zen/vulnerabilities/sql_injection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
"""

import re
import ctypes
from aikido_zen.helpers.logging import logger
from .map_dialect_to_rust_int import map_dialect_to_rust_int
from .get_lib_path import get_binary_path
from ...helpers.encode_safely import encode_safely
from .zen_internal_ffi import ZenInternal


def detect_sql_injection(query, user_input, dialect):
Expand All @@ -20,32 +17,7 @@ def detect_sql_injection(query, user_input, dialect):
if should_return_early(query_l, userinput_l):
return False

internals_lib = ctypes.CDLL(get_binary_path())
internals_lib.detect_sql_injection.argtypes = [
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.c_int,
]
internals_lib.detect_sql_injection.restype = ctypes.c_int

# Parse input variables for rust function
query_bytes = encode_safely(query_l)
userinput_bytes = encode_safely(userinput_l)
query_buffer = (ctypes.c_uint8 * len(query_bytes)).from_buffer_copy(query_bytes)
userinput_buffer = (ctypes.c_uint8 * len(userinput_bytes)).from_buffer_copy(
userinput_bytes
)
dialect_int = map_dialect_to_rust_int(dialect)

c_int_res = internals_lib.detect_sql_injection(
query_buffer,
len(query_bytes),
userinput_buffer,
len(userinput_bytes),
dialect_int,
)
c_int_res = ZenInternal().detect_sql_injection(query_l, userinput_l, dialect)

# This means that an error occurred in the library
if c_int_res == 2:
Expand Down

This file was deleted.

57 changes: 57 additions & 0 deletions aikido_zen/vulnerabilities/sql_injection/zen_internal_ffi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Interface for calling zen-internal shared library
"""

import ctypes
import threading
from .get_lib_path import get_binary_path
from ...helpers.encode_safely import encode_safely


class __Singleton(type):
_instances = {}
_lock = threading.Lock() # Ensures thread safety

def __call__(cls, *args, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain undecided. While I appreciate your insightful observation regarding the contentious nature of metaclasses, in this specific instance—where the singleton pattern is widely recognized—it may be acceptable.

Ultimately, the choice of which singleton implementation (each has its own drawbacks) to adopt is largely a matter of personal preference and not probabilistic exercise. Therefore, I would value the perspective of a real human being on this matter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call method implements complex singleton pattern with thread safety using atomic check-and-create operations, but lacks explanatory comments to help developers understand the locking mechanism and why it's necessary for thread-safe singleton creation. More info

with cls._lock: # Lock to make the check-and-create operation atomic
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]


class ZenInternal(metaclass=__Singleton):
# Reference : [rust lib]/src/sql_injection/helpers/select_dialect_based_on_enum.rs
SQL_DIALECTS = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have helper function for this btw

Copy link
Contributor Author

@isimluk isimluk Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean map_dialect_to_rust_int?

I took that down ✂️ in the second commit.

I felt that this fn carried intimate knowledge of the FFI/zen-internal and thus should be part of the class that deals with these matters. Further, I decided to expand the method and leave only the map in there as I saw that these things are contextually well collocated and there is no need for it to exist as separate fn.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, generally we just don't like to refactor stuff & the helpers can include logic specific to parts of code, it's just a way of abstracting away more tedious stuff that rarely changes (like this map). For me it's fine either way, but it's easier to leave it as is and cut down on the refactors (at least that's my opinion).

"generic": 0,
"clickhouse": 3,
"mysql": 8,
"postgres": 9,
"sqlite": 12,
}

def __init__(self):
self._lib = ctypes.CDLL(get_binary_path())
self._lib.detect_sql_injection.argtypes = [
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.c_int,
]
self._lib.detect_sql_injection.restype = ctypes.c_int

def detect_sql_injection(self, query, user_input, dialect):
query_bytes = encode_safely(query)
userinput_bytes = encode_safely(user_input)
query_buffer = (ctypes.c_uint8 * len(query_bytes)).from_buffer_copy(query_bytes)
userinput_buffer = (ctypes.c_uint8 * len(userinput_bytes)).from_buffer_copy(
userinput_bytes
)
dialect_int = self.SQL_DIALECTS[dialect]
return self._lib.detect_sql_injection(
query_buffer,
len(query_bytes),
userinput_buffer,
len(userinput_bytes),
dialect_int,
)