Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions osprey_worker/src/osprey/engine/stdlib/udfs/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,28 @@ def execute(self, execution_context: ExecutionContext, arguments: StringArgument

# return any valid urls encountered in the message
return list(valid_urls)


_TOKEN_PATTERN = re.compile(r"[\w]+(?:'[\w]+)?", re.UNICODE)


def tokenize_text(s: str) -> list[str]:
s = s.replace("'", "'").replace('ʼ', "'")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what does first replace do?

Copy link
Copy Markdown
Collaborator Author

@haileyok haileyok Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nice catch. there are two different apostrophes we want to replace with a "normal" apostrophe, and it looks like that first one actually might have gotten replaced with a normal apostrophe maybe through copy/paste or something. switched it to just use unicode escapes so its more obvious what it's doing

a5fab61

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

return _TOKEN_PATTERN.findall(s.lower())


class StringTokenize(UDFBase[StringArguments, list[str]]):
"""
Used to convert the given string into a list of individual tokens. Returns a list of individual
tokens split by spaces and punctuation marks.

Note that StringTokenize does not split on a single apostrophe found inside a word (e.g. contractions).
For example, the string "don't go" would result in ["don't", "go"]. Tokens are sequences of word
characters with at most one internal apostrophe, and the string "do''not''go" would result in
["do", "not", "go"].
"""

category = UdfCategories.STRING

def execute(self, execution_context: ExecutionContext, arguments: StringArguments) -> list[str]:
return tokenize_text(arguments.s)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
StringSplit,
StringStartsWith,
StringStrip,
StringTokenize,
StringToLower,
StringToUpper,
)
Expand All @@ -39,6 +40,7 @@
StringToUpper,
StringExtractDomains,
StringExtractURLs,
StringTokenize,
)
),
]
Expand Down Expand Up @@ -319,3 +321,33 @@ def test_extract_urls(execute: ExecuteFunction, text: str, expected_result: List
result: List[str] = data['Result']
assert len(expected_result) == len(result)
assert set(expected_result) == set(result)


@pytest.mark.parametrize(
'text,expected_result',
[
('the cat in the box', ['the', 'cat', 'in', 'the', 'box']),
('the Cat in the bOx', ['the', 'cat', 'in', 'the', 'box']),
("i'm going to the store", ["i'm", 'going', 'to', 'the', 'store']),
('hello. where are you going? over here!', ['hello', 'where', 'are', 'you', 'going', 'over', 'here']),
('hello123world', ['hello123world']),
('test 456 test', ['test', '456', 'test']),
('the cat', ['the', 'cat']),
('hello\\tworld\\ntest', ['hello', 'world', 'test']),
('hello, world!', ['hello', 'world']),
('end. start', ['end', 'start']),
('café résumé', ['café', 'résumé']),
("don't", ["don't"]), # curly apostrophe
("cat's", ["cat's"]),
("''hello", ['hello']),
("test''test", ['test', 'test']),
],
)
def test_tokenize(execute: ExecuteFunction, text: str, expected_result: List[str]) -> None:
data: Dict[str, Any] = execute(f"""
Result = StringTokenize(s="{text}")
""")

result: List[str] = data['Result']
assert len(expected_result) == len(result)
assert expected_result == result
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import pytest
from osprey.engine.conftest import ExecuteFunction
from osprey.engine.stdlib.udfs.unicode_censored import StringCheckCensored
from osprey.engine.udf.registry import UDFRegistry

pytestmark = [
pytest.mark.use_udf_registry(
UDFRegistry.with_udfs(
StringCheckCensored,
)
),
]


class TestCheckCensoredUDF:
"""Tests for the StringCheckCensored UDF."""

def test_basic_match(self, execute: ExecuteFunction):
data = execute("""
Result = StringCheckCensored(s="cat", pattern="cat")
""")
assert data['Result'] is True

def test_censored_match(self, execute: ExecuteFunction):
data = execute("""
Result = StringCheckCensored(s="c@t", pattern="cat")
""")
assert data['Result'] is True

def test_no_match(self, execute: ExecuteFunction):
data = execute("""
Result = StringCheckCensored(s="dog", pattern="cat")
""")
assert data['Result'] is False

def test_unicode_lookalike_match(self, execute: ExecuteFunction):
# using cyrillic 'а' which looks like latin 'a'
data = execute("""
Result = StringCheckCensored(s="cаt", pattern="cat")
""")
assert data['Result'] is True

data = execute("""
Result = StringCheckCensored(s="𝒞𝞪𝔗", pattern="cat")
""")
assert data['Result'] is True

def test_plural_option_enabled(self, execute: ExecuteFunction):
data = execute("""
WithPlural = StringCheckCensored(s="cats", pattern="cat", plurals=True)
WithoutPlural = StringCheckCensored(s="cats", pattern="cat", plurals=False)
""")
assert data['WithPlural'] is True
assert data['WithoutPlural'] is False

def test_substring_option(self, execute: ExecuteFunction):
data = execute("""
WithSubstring = StringCheckCensored(s="concatenate", pattern="cat", substrings=True)
WithoutSubstring = StringCheckCensored(s="concatenate", pattern="cat", substrings=False)
""")
assert data['WithSubstring'] is True
assert data['WithoutSubstring'] is False

def test_must_be_censored_option(self, execute: ExecuteFunction):
data = execute("""
PlainText = StringCheckCensored(s="cat", pattern="cat", must_be_censored=True)
CensoredText = StringCheckCensored(s="c@t", pattern="cat", must_be_censored=True)
""")
assert data['PlainText'] is False
assert data['CensoredText'] is True

def test_must_be_censored_with_surrounding_test(self, execute: ExecuteFunction):
data = execute("""
PlainText = StringCheckCensored(s="the cat sat", pattern="cat", must_be_censored=True)
CensoredText = StringCheckCensored(s="the c@t sat", pattern="cat", must_be_censored=True)
""")
assert data['PlainText'] is False
assert data['CensoredText'] is True

def test_case_insensitive(self, execute: ExecuteFunction):
data = execute("""
Upper = StringCheckCensored(s="CAT", pattern="cat")
Mixed = StringCheckCensored(s="CaT", pattern="cat")
""")
assert data['Upper'] is True
assert data['Mixed'] is True

def test_with_special_chars_in_pattern(self, execute: ExecuteFunction):
data = execute("""
Result = StringCheckCensored(s="a.b", pattern="a.b")
""")
assert data['Result'] is True

def test_empty_string(self, execute: ExecuteFunction):
data = execute("""
Result = StringCheckCensored(s="", pattern="cat")
""")
assert data['Result'] is False

@pytest.mark.parametrize(
'input_str,pattern,expected',
[
('hello world', 'hello', True),
('h3ll0', 'hello', True),
('h e l l o', 'hello', False),
('HELLO', 'hello', True),
('dog', 'cat', False),
('', 'test', False),
('c@t', 'cat', True),
('h4ck3r', 'hacker', True),
('p@$$w0rd', 'password', True),
('t35t', 'test', True),
('1337', 'leet', True),
('n00b', 'noob', True),
('ph1sh', 'phish', True),
('саt', 'cat', True),
('руthоn', 'python', True),
('НЕLLО', 'hello', True),
('Ηello', 'hello', True),
('Αpple', 'apple', True),
('Βank', 'bank', True),
('Κing', 'king', True),
('Νice', 'nice', True),
('Οpen', 'open', True),
('Ρython', 'python', True),
('Τest', 'test', True),
('Χmas', 'xmas', True),
('Υes', 'yes', True),
('Ζero', 'zero', True),
('𝐜𝐚𝐭', 'cat', True),
('𝑐𝑎𝑡', 'cat', True),
('𝒄𝒂𝒕', 'cat', True),
('𝓬𝓪𝓽', 'cat', True),
('𝔠𝔞𝔱', 'cat', True),
('𝕔𝕒𝕥', 'cat', True),
('𝖈𝖆𝖙', 'cat', True),
('𝗰𝗮𝘁', 'cat', True),
('𝘤𝘢𝘵', 'cat', True),
('𝙘𝙖𝙩', 'cat', True),
('𝚌𝚊𝚝', 'cat', True),
('cat', 'cat', True),
('hello', 'hello', True),
('HELLO', 'hello', True),
('ᑕᗩT', 'cat', True),
('ꓚꓮT', 'cat', True),
('ⲤⲀT', 'cat', True),
('ԁоg', 'dog', True),
('bаnk', 'bank', True),
('pаypаl', 'paypal', True),
('аmаzоn', 'amazon', True),
('s3cur1ty', 'security', True),
('4dm1n', 'admin', True),
('r00t', 'root', True),
('z3r0', 'zero', True),
('0n3', 'one', True),
('tw0', 'two', True),
('с@т', 'cat', True),
('ρ@$$ωθrd', 'password', True),
('𝕙𝕒𝕔𝕜', 'hack', True),
('հello', 'hello', True),
('ոice', 'nice', True),
('քhone', 'phone', True),
('ցame', 'game', True),
('(at', 'cat', True),
('<at', 'cat', True),
('ca+', 'cat', True),
('ca7', 'cat', True),
('he!!o', 'hello', True),
('he||o', 'hello', True),
('c.a.t', 'cat', True),
('c_a_t', 'cat', True),
('c+a+t', 'cat', True),
('c/a/t', 'cat', True),
('c@a@t', 'cat', True),
('c#a#t', 'cat', True),
# zero-width and invisible chars as separators
('c\u200bat', 'cat', True),
('c\u200cat', 'cat', True),
('c\u200dat', 'cat', True),
('ca\u200et', 'cat', True),
('ca\u200ft', 'cat', True),
('c\ufeffat', 'cat', True),
# multiple zero-width chars
('c\u200b\u200bat', 'cat', True),
('c\u200b\u200c\u200dat', 'cat', True),
# zero-width between every letter
('c\u200ba\u200bt', 'cat', True),
('h\u200be\u200bl\u200bl\u200bo', 'hello', True),
('c\u200b_a\u200c_t', 'cat', True),
('c.\u200ba.\u200bt', 'cat', True),
],
)
def test_various_inputs(self, execute: ExecuteFunction, input_str: str, pattern: str, expected: bool):
data = execute(f"""
Result = StringCheckCensored(s="{input_str}", pattern="{pattern}")
""")
assert data['Result'] is expected

def test_leet_speak_variations(self, execute: ExecuteFunction):
data = execute("""
Leet1 = StringCheckCensored(s="h3ll0", pattern="hello")
Leet2 = StringCheckCensored(s="t35t", pattern="test")
""")
assert data['Leet1'] is True
assert data['Leet2'] is True

def test_with_separator_characters(self, execute: ExecuteFunction):
data = execute("""
Dots = StringCheckCensored(s="c.a.t", pattern="cat", substrings=True)
Underscores = StringCheckCensored(s="c_a_t", pattern="cat", substrings=True)
Mixed = StringCheckCensored(s="c.a_t", pattern="cat", substrings=True)
""")
assert data['Dots'] is True
assert data['Underscores'] is True
assert data['Mixed'] is True
Loading
Loading