Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9a2e531
Add osprey.whenrules_completeness metric
cmttt Mar 25, 2026
6a39052
Merge pull request #1 from cmttt/ls/action-errors-by-type
cmttt Mar 25, 2026
cb06090
[osprey] Add rule audit logging and trace_id to ExecutionResult
cmttt Mar 25, 2026
6b84d9d
Address review feedback: add_slots, remove getattr, add safety comment
cmttt Mar 25, 2026
fb92dab
Merge pull request #2 from cmttt/ls/rule-audit-logging
cmttt Mar 26, 2026
31ccdd2
Add safe division to prevent ZeroDivisionError in rule evaluation
cmttt Mar 26, 2026
1cdf071
Merge pull request #3 from cmttt/ls/fix-safe-division
cmttt Mar 26, 2026
f6b4915
Change coerce_type default from False to True for JsonData/EntityJson
cmttt Mar 26, 2026
cd99ce2
Merge pull request #4 from cmttt/ls/coerce-type-default-true
cmttt Mar 26, 2026
0bff7b6
Merge branch 'roostorg:main' into main
cmttt Mar 27, 2026
1f4f9c3
[osprey] Add osprey_async_worker package for asyncio-based worker
cmttt Mar 30, 2026
ea204e3
[osprey] Add worker_type metric tag to all executor/sink metrics
cmttt Mar 30, 2026
1c2ad81
[osprey] Add BigQuery shadow sink for async worker validation
cmttt Mar 30, 2026
7210b40
[osprey] Add async plugin system (osprey_async_plugin entry_point)
cmttt Mar 30, 2026
a036c1e
[osprey] Add native async UDF support to async executor
cmttt Mar 30, 2026
b5a2100
[osprey] Simplify async executor: no thread pool, clean UDF model
cmttt Mar 30, 2026
f514c33
[osprey] Move worker_type tag to constant_tags on metrics singleton
cmttt Mar 31, 2026
c712690
[osprey] Add tests for async executor and sink infrastructure
cmttt Mar 31, 2026
306d1ae
[osprey] Benchmark: async executor is 2x faster for sync UDFs
cmttt Mar 31, 2026
905e7ec
[osprey] Add legacy fallback to async executor for unported UDFs
cmttt Mar 31, 2026
2826951
[osprey] Add async infrastructure: pigeon, coordinator, etcd, externa…
cmttt Mar 31, 2026
564f8cc
Merge pull request #5 from cmttt/ls/async-worker
cmttt Mar 31, 2026
e4a4e90
[osprey] Add AsyncOspreyEngine — remove gevent dependency from async …
cmttt Mar 31, 2026
63a0627
[osprey] Don't load sync plugins by default in async bootstrap
cmttt Mar 31, 2026
450b406
[osprey] Always load stdlib UDFs and validators in async bootstrap
cmttt Mar 31, 2026
6f440f5
[osprey] Gracefully handle helper creation failures in async bootstrap
cmttt Mar 31, 2026
19559c4
Merge pull request #6 from cmttt/ls/async-engine-improvements
cmttt Mar 31, 2026
0e1f589
[osprey] Add from_direct_address factory to OspreyCoordinatorInputStream
cmttt Mar 31, 2026
d076e28
[osprey] Fix AsyncUDFBase TypeVar resolution — reuse UDFBase's TypeVars
cmttt Mar 31, 2026
9d15d04
[osprey] Fix AsyncUDFBase._get_udf_base_args — resolve concrete types
cmttt Mar 31, 2026
4befd75
Merge pull request #7 from cmttt/ls/coordinator-direct-addr
cmttt Mar 31, 2026
5543e07
[osprey] Add AsyncPubSubPublisher with async batching, remove BQ shad…
cmttt Mar 31, 2026
313d620
Merge pull request #9 from cmttt/ls/fix-bq-sink-publisher
cmttt Mar 31, 2026
1760141
Add register_validation_exporter plugin hook
cmttt Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions osprey_async_worker/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "osprey-async-worker"
version = "0.1.0"
description = "Asyncio-based Osprey worker"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"osprey_rpc",
"osprey_worker",
]

[project.scripts]
osprey-async-cli = "osprey.async_worker.cli.main:cli"

[project.entry-points."osprey_async_plugin"]

[tool.setuptools]
include-package-data = true

[tool.setuptools.package-data]
"*" = ["*.json", "*.yaml", "*.yml"]
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OSPREY_ASYNC_ADAPTOR = 'osprey_async_plugin'
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Hook specifications for the async worker plugin system.

Mirrors osprey.worker.adaptor.hookspecs but uses the 'osprey_async_plugin'
entry_point group. Plugins register async output sinks and UDFs here.

UDFs are shared with the sync worker (they're registered via the existing
'osprey_plugin' hooks and wrapped with SyncUDFAdapter). Async-native UDFs
can also be registered here.

Output sinks MUST be async (AsyncBaseOutputSink) since the async worker
doesn't use gevent.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence, Type

import pluggy
from osprey.engine.ast_validator.base_validator import BaseValidator
from osprey.engine.udf.base import UDFBase

from osprey.async_worker.adaptor.constants import OSPREY_ASYNC_ADAPTOR
from osprey.async_worker.adaptor.interfaces import AsyncBaseOutputSink

if TYPE_CHECKING:
from osprey.worker.lib.config import Config

hookspec: pluggy.HookspecMarker = pluggy.HookspecMarker(OSPREY_ASYNC_ADAPTOR)


@hookspec
def register_async_output_sinks(config: Config) -> Sequence[AsyncBaseOutputSink]:
"""Register async output sinks for the async worker.

These must be AsyncBaseOutputSink instances (not sync BaseOutputSink).
The async worker will call `await sink.push(result)` for each result.
"""
raise NotImplementedError


@hookspec
def register_udfs() -> Sequence[Type[UDFBase[Any, Any]]]:
"""Register UDFs for the async worker.

These are the same UDFBase types as the sync worker. The async executor
runs them in a thread pool via run_in_executor. Async-native UDFs can
also be registered here in the future.
"""
raise NotImplementedError


@hookspec
def register_ast_validators() -> Sequence[Type[BaseValidator]]:
"""Register AST validators. Same interface as the sync worker."""
raise NotImplementedError
160 changes: 160 additions & 0 deletions osprey_async_worker/src/osprey/async_worker/adaptor/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Async plugin interfaces for the osprey async worker.

AsyncUDFBase extends UDFBase so it works with the existing engine machinery
(UDFRegistry, CallExecutor, validation, type checking, argument resolution).

All I/O UDFs in the async worker MUST be AsyncUDFBase subclasses — existing
sync UDFs that use gevent primitives will not work without monkey patching.
Pure-computation UDFs (no I/O) can remain as regular UDFBase and run inline.
"""

import abc
import asyncio
from typing import Any, ClassVar, Generic, Sequence, Tuple, TypeVar

from osprey.engine.executor.execution_context import ExecutionContext, ExecutionResult
from osprey.engine.udf.base import (
Arguments,
BatchableArguments,
BatchableUDFBase,
RValue,
UDFBase,
)
from result import Result

_T = TypeVar('_T')


class AsyncUDFBase(UDFBase[Arguments, RValue]):
"""Native async UDF base class.

Extends UDFBase so it integrates with UDFRegistry, CallExecutor, and
the full validation/type-checking pipeline. The async executor detects
AsyncUDFBase instances via isinstance() and awaits async_execute()
directly on the event loop — no thread pool.

The sync execute() raises so it can't accidentally be called in the
async executor's sync path.
"""

execute_async: ClassVar[bool] = True
is_native_async: ClassVar[bool] = True

def __init__(self, validation_context, arguments):
super().__init__(validation_context, arguments)

@classmethod
def _get_udf_base_args(cls):
"""Override to include AsyncUDFBase in the generic origin check."""
import typing_inspect

for base in cls.__mro__:
for generic_base in typing_inspect.get_generic_bases(base):
origin = typing_inspect.get_origin(generic_base)
if origin in (UDFBase, AsyncUDFBase) or (
hasattr(origin, '__mro__') and UDFBase in origin.__mro__
):
args = typing_inspect.get_args(generic_base)
# Only return if args are concrete (not TypeVars)
if args and not any(isinstance(a, TypeVar) for a in args):
return args

# Fallback to parent
return super()._get_udf_base_args()

def execute(self, execution_context: ExecutionContext, arguments: Arguments) -> RValue:
raise RuntimeError(
f'{self.__class__.__name__} is a native async UDF. '
f'Use async_execute() instead of execute().'
)

@abc.abstractmethod
async def async_execute(self, execution_context: ExecutionContext, arguments: Arguments) -> RValue:
"""Override this to implement the UDF's async execution logic."""
raise NotImplementedError


class AsyncBatchableUDFBase(BatchableUDFBase[Arguments, RValue, BatchableArguments]):
"""Native async batchable UDF base class.

Same as AsyncUDFBase but for batchable UDFs. The async executor detects
these and awaits async_execute_batch() directly.
"""

is_native_async: ClassVar[bool] = True

def execute(self, execution_context: ExecutionContext, arguments: Arguments) -> RValue:
raise RuntimeError(
f'{self.__class__.__name__} is a native async UDF. '
f'Use async_execute() instead of execute().'
)

def execute_batch(
self,
execution_context: ExecutionContext,
udfs: Sequence[UDFBase[Any, Any]],
arguments: Sequence[BatchableArguments],
) -> Sequence[Result[RValue, Exception]]:
raise RuntimeError(
f'{self.__class__.__name__} is a native async UDF. '
f'Use async_execute_batch() instead of execute_batch().'
)

@abc.abstractmethod
async def async_execute(self, execution_context: ExecutionContext, arguments: Arguments) -> RValue:
raise NotImplementedError

@abc.abstractmethod
async def async_execute_batch(
self,
execution_context: ExecutionContext,
udfs: Sequence[UDFBase[Any, Any]],
arguments: Sequence[BatchableArguments],
) -> Sequence[Result[RValue, Exception]]:
raise NotImplementedError


# --- Output sinks and input streams (unchanged) ---


class AsyncBaseOutputSink(abc.ABC):
"""Async output sink."""

timeout: float = 2.0
max_retries: int = 0

@abc.abstractmethod
def will_do_work(self, result: ExecutionResult) -> bool:
raise NotImplementedError

@abc.abstractmethod
async def push(self, result: ExecutionResult) -> None:
raise NotImplementedError

async def stop(self) -> None:
pass


class AsyncMultiOutputSink(AsyncBaseOutputSink):
"""Tees execution results to multiple async output sinks."""

def __init__(self, sinks: Sequence[AsyncBaseOutputSink]):
self._sinks = sinks

def will_do_work(self, result: ExecutionResult) -> bool:
return any(sink.will_do_work(result) for sink in self._sinks)

async def push(self, result: ExecutionResult) -> None:
for sink in self._sinks:
if sink.will_do_work(result):
try:
async with asyncio.timeout(sink.timeout):
await sink.push(result)
except TimeoutError:
pass
except Exception:
pass

async def stop(self) -> None:
for sink in self._sinks:
await sink.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Plugin manager for the async worker.

Discovers plugins via the 'osprey_async_plugin' setuptools entry_point group.
Also loads sync plugins from 'osprey_plugin' for UDFs (wrapped in adapters).
"""

from __future__ import annotations

from functools import lru_cache
from typing import TYPE_CHECKING, Any, List, Sequence, Type

import pluggy
from osprey.engine.ast_validator import ValidatorRegistry
from osprey.engine.executor.udf_execution_helpers import HasHelper, UDFHelpers
from osprey.engine.udf.base import UDFBase
from osprey.engine.udf.registry import UDFRegistry

from osprey.async_worker.adaptor import hookspecs as async_hookspecs
from osprey.async_worker.adaptor.constants import OSPREY_ASYNC_ADAPTOR
from osprey.async_worker.adaptor.interfaces import AsyncBaseOutputSink, AsyncMultiOutputSink

if TYPE_CHECKING:
from osprey.worker.lib.config import Config

hookimpl_osprey_async: pluggy.HookimplMarker = pluggy.HookimplMarker(OSPREY_ASYNC_ADAPTOR)

plugin_manager = pluggy.PluginManager(OSPREY_ASYNC_ADAPTOR)
plugin_manager.add_hookspecs(async_hookspecs)


def _flatten(seq: List[List[Any]]) -> List[Any]:
return sum(seq, [])


@lru_cache(maxsize=1)
def load_all_async_plugins() -> None:
"""Load all plugins registered under the 'osprey_async_plugin' entry_point group."""
plugin_manager.load_setuptools_entrypoints(OSPREY_ASYNC_ADAPTOR)
plugin_manager.check_pending()


def bootstrap_async_udfs(load_sync_plugins: bool = False) -> tuple[UDFRegistry, UDFHelpers]:
"""Bootstrap UDFs from async plugins + stdlib.

Always loads stdlib UDFs (JsonData, StringLength, Rule, etc.) since
they're needed for basic rule compilation. Optionally loads from
osprey_plugin too (triggers gevent side effects).
"""
# Always load stdlib UDFs — they don't trigger gevent side effects
from osprey.worker._stdlibplugin.udf_register import register_udfs as stdlib_register_udfs

load_all_async_plugins()
udf_helpers = UDFHelpers()

# Load stdlib + async plugin UDFs
all_udfs: List[Type[UDFBase[Any, Any]]] = list(stdlib_register_udfs()) + _flatten(plugin_manager.hook.register_udfs())

if load_sync_plugins:
from osprey.worker.adaptor.plugin_manager import load_all_osprey_plugins, plugin_manager as sync_plugin_manager

load_all_osprey_plugins()
sync_udfs: List[Type[UDFBase[Any, Any]]] = _flatten(sync_plugin_manager.hook.register_udfs())

seen = {udf for udf in all_udfs}
for udf in sync_udfs:
if udf not in seen:
seen.add(udf)
all_udfs.append(udf)

for udf in all_udfs:
if issubclass(udf, HasHelper):
try:
udf_helpers.set_udf_helper(udf, udf.create_provider())
except Exception:
# Skip helper creation for UDFs that fail (e.g., etcd not available).
# These UDFs will fail at execution time via the legacy fallback,
# which is expected — errors get captured in error_infos.
pass

udf_registry = UDFRegistry.with_udfs(*all_udfs)
return udf_registry, udf_helpers


def bootstrap_async_output_sinks(config: Config) -> AsyncMultiOutputSink:
"""Bootstrap async output sinks from async plugins only.

Does NOT load sync output sinks — the async worker uses only async sinks.
"""
load_all_async_plugins()
sinks: List[AsyncBaseOutputSink] = _flatten(plugin_manager.hook.register_async_output_sinks(config=config))
return AsyncMultiOutputSink(sinks)


def bootstrap_async_ast_validators(load_sync_plugins: bool = False) -> None:
"""Bootstrap AST validators from async plugins + stdlib.

Always loads stdlib validators (ValidateCallKwargs, etc.) since they're
needed for rule compilation. Optionally loads from osprey_plugin too.
"""
# Always load stdlib validators — they don't trigger gevent side effects
from osprey.worker._stdlibplugin.validator_regsiter import register_ast_validators as stdlib_register_validators

load_all_async_plugins()
validators = list(stdlib_register_validators()) + _flatten(plugin_manager.hook.register_ast_validators())

if load_sync_plugins:
from osprey.worker.adaptor.plugin_manager import load_all_osprey_plugins, plugin_manager as sync_plugin_manager

load_all_osprey_plugins()
sync_validators = _flatten(sync_plugin_manager.hook.register_ast_validators())
validators = validators + sync_validators

registry = ValidatorRegistry.get_instance()
seen = set()
for validator in validators:
if validator not in seen:
seen.add(validator)
registry.register_to_instance(validator)
Empty file.
Loading
Loading