Add register_validation_exporter plugin hook#202
Closed
cmttt wants to merge 34 commits intoroostorg:mainfrom
Closed
Add register_validation_exporter plugin hook#202cmttt wants to merge 34 commits intoroostorg:mainfrom
cmttt wants to merge 34 commits intoroostorg:mainfrom
Conversation
Replaces the proposed action_errors_by_type metric with a metric that directly answers: what % of WhenRules blocks ran with incomplete data? Emitted once per WhenRules evaluation in resolve_arguments() with: - action: the action type - degraded: true if any rule or effect dependency was Err(None) A WhenRules block is "degraded" when at least one item in rules_any or then resolved to None due to an upstream failure. This means the enforcement decision was made with incomplete data — a rule that should have been evaluated wasn't, or an effect that should have been applied wasn't. Cardinality: ~200 (100 actions x 2 bool values).
Add osprey.whenrules_completeness metric
Adds per-WhenRules audit data collection and trace context propagation: - WhenRulesAuditEntry dataclass: captures which rules evaluated, matched, failed, and which effects were emitted per WhenRules block - ExecutionContext collects audit entries during execution - ExecutionResult gains trace_id (from ddtrace parent span) and rule_audit_entries fields - WhenRules.resolve_arguments() tracks failed rule names by extracting identifiers from grammar.Name AST nodes - WhenRules.execute() builds audit entry before early return This enables downstream sinks to write per-action rule execution audit logs for auditability and compliance.
- Add @add_slots to WhenRulesAuditEntry for consistency with other dataclasses in execution_context.py - Replace getattr fallback with assertion + direct access in WhenRules.execute() to surface invariant violations - Add comment explaining why self storage is safe (synchronous execution, no yield between resolve_arguments and execute)
Add rule audit logging and trace_id to ExecutionResult
Return 0 instead of throwing ZeroDivisionError when dividing by zero. This matches the + 0.001 epsilon pattern already used by guarded rules, and prevents cascading Err(None) propagation that silently disables downstream rules. 32 unguarded division operations across 6 rule files were causing 947K Sentry events (DISCORD-SMITE-RULES-SINK-21H).
Add safe division to prevent ZeroDivisionError in rule evaluation
JSON payloads are inherently loosely typed — BigIntegers become strings,
booleans appear where strings are expected, etc. Requiring exact type
matches by default causes InvalidJsonType errors that silently disable
rules (11.6M+ Sentry events).
With coerce_type=True by default, the engine attempts to convert the
value to the declared type (e.g., int("123") or str(False)). If
coercion fails, InvalidJsonType is still raised.
This affects both JsonData and EntityJson (EntityJsonArguments inherits
from json_data.Arguments).
Change coerce_type default to True for JsonData/EntityJson
New workspace package that replaces gevent with asyncio for the executor and worker loop. Shares engine code (execution graphs, compilation, UDF registry) with osprey_worker. Includes async executor, plugin interfaces, sync-to-async adapters, and a minimal CLI for validation and benchmarking.
Add worker_type:gevent tag to gevent worker metrics and worker_type:async to async worker metrics so both can be distinguished in Datadog dashboards when running parallel deployments. Also adds stdlib-only bootstrap mode and test data for the async worker CLI.
Publishes execution results to BQ Pub/Sub with source='osprey-async' so async worker results can be compared against gevent worker results (source='osprey') in BigQuery. This is the only output sink needed for shadow mode — no Druid, SafetyRecord, or effect sinks. Uses the same SmiteExecutionResultBigQueryPubsubEvent model and PubSubPublisher as the gevent worker. Runs the sync publisher in a thread pool via run_in_executor.
New plugin system for the async worker using 'osprey_async_plugin' setuptools entry_point group, separate from the gevent 'osprey_plugin'. Hookspecs: - register_async_output_sinks: async-only output sinks (AsyncBaseOutputSink) - register_udfs: same UDFBase types as sync (run via run_in_executor) - register_ast_validators: same interface as sync Plugin manager merges UDFs and validators from both async and sync plugin groups, but only loads output sinks from async plugins since gevent-based sinks won't work without monkey patching. Also corrects deployment strategy: coordinator uses single-delivery (not broadcast), so shadow mode needs a separate coordinator instance with its own Pub/Sub subscription.
The executor now detects AsyncUDFBase instances and awaits them directly on the event loop instead of dispatching through run_in_executor. This avoids thread pool overhead for UDFs that use native async I/O (grpc.aio, aiohttp, etc.). Dispatch logic: - AsyncUDFBase → await udf.execute() directly (no thread pool) - AsyncBatchableUDFBase → await udf.execute_batch() directly - Regular UDFBase → run_in_executor (thread pool, backward compat) All three paths share the same semaphore for concurrency control and emit identical metrics with the worker_type:async tag.
Rewrite based on the realization that existing gevent-based UDFs can't work in the async worker anyway (no monkey patching = broken gevent primitives). This means every I/O UDF must be a native AsyncUDFBase subclass — no point bridging with run_in_executor. Executor now has two clean paths: - Sync UDFs (execute_async=False): run inline, pure computation only - Async UDFs (AsyncUDFBase): awaited as tasks with semaphore AsyncUDFBase extends UDFBase so it works with UDFRegistry, CallExecutor, validation, type checking, and argument resolution. Uses async_execute() instead of execute() to distinguish the async path. Removed: SyncUDFAdapter, SyncBatchableUDFAdapter, SyncOutputSinkAdapter, run_in_executor thread pool paths.
Instead of adding worker_type to every individual metric call site,
set it once at startup via set_worker_type_tag(). The DogStatsd
constant_tags mechanism automatically appends it to ALL metrics.
- Gevent worker: set_worker_type_tag('gevent') in init_config()
- Async worker: set_worker_type_tag('async') in init_config()
- Removed per-call-site WORKER_TYPE_TAG from executor, rules sink,
output sink, and BQ shadow sink
- Deleted standalone metric_tags.py modules from both packages
19 tests covering: - Executor: sync UDFs inline, dependency chains, rule evaluation, missing paths, empty rules, complex graphs, sync-only mode - Sinks: static input stream, multi-output sink (push, will_do_work, failure isolation, timeout handling, stop propagation) - ExecutionResult structure validation - Parity with gevent executor for stdlib UDFs
Updated benchmark CLI to run both gevent and async executors against the same rules + actions for direct comparison. Includes warmup iterations to eliminate JIT/cache effects. Results (5000 iterations, stdlib UDFs only): Gevent: 1560 actions/sec, 0.64ms avg latency Async: 3139 actions/sec, 0.32ms avg latency Ratio: 2.01x faster Also fixed osprey_v2_skip_async_classification -> osprey_skip_async_classification.
The executor now has 3 dispatch paths: - Sync inline: execute_async=False UDFs (pure computation) - Native async: AsyncUDFBase UDFs (awaited on event loop) - Legacy fallback: execute_async=True but not AsyncUDFBase (thread pool) The legacy fallback enables using the same rules from ~/smite-rules during the migration. Unported UDFs run in a thread pool and may fail on gevent calls, but errors are captured gracefully in error_infos. As UDFs get ported to AsyncUDFBase they move to native async. Also fixes osprey_skip_async_classification (removed _v2 prefix).
…l service Phase 1 async infrastructure for the experimental worker: Async pigeon client (lib/pigeon/client.py): - grpc.aio channels instead of gevent-patched sync gRPC - asyncio.Semaphore for concurrency (replaces gevent.pool.Pool) - asyncio.gather for chunked routing (replaces pool.imap_unordered) - contextvars for skip_rate_limit (replaces gevent.local) - Same public API: RoutedClient, RoutingType, RetryPolicy Async coordinator input stream (lib/coordinator_input_stream.py): - grpc.aio bidirectional streaming with coordinator - asyncio.Queue for request/response flow - AsyncVerdictsAckingContext for ack/nack - Graceful shutdown via asyncio.Event - Reconnection with jitter Async etcd sources provider (lib/etcd/sources_provider.py): - AsyncEtcdSourcesProvider using run_in_executor for sync etcd client - AsyncInputStreamReadySignaler using asyncio.Event - Live rule updates from etcd with pause/resume coordination Async external service cache (lib/external_service.py): - asyncio.Future instead of gevent.event.AsyncResult - Same deduplication behavior for concurrent callers - TTL-based cache expiration - Batch operations with Ok/Err results 51 tests passing covering all new infrastructure.
Add osprey_async_worker package (gevent -> asyncio)
…worker New AsyncOspreyEngine replaces OspreyEngine for the async worker: - Uses stdlib concurrent.futures.ThreadPoolExecutor for rule compilation instead of gevent.pool.ThreadPool - Provides async execute() that calls the async executor directly - AsyncRulesSink now uses AsyncOspreyEngine (no gevent imports) - ActionSampler inlined to avoid importing gevent rules_sink module - Benchmark gevent comparison is now optional (try/except ImportError) The async worker's runtime code has zero gevent imports. Gevent is only a transitive dependency through osprey_worker types (UDFBase, etc.) which don't use gevent themselves.
bootstrap_async_udfs and bootstrap_async_ast_validators now only load from osprey_async_plugin by default. Loading sync plugins (osprey_plugin) is opt-in via load_sync_plugins=True since it triggers module-level side effects: gevent monkey patching, etcd connections, Directory.instance(). The async plugin package re-exports all needed UDFs, so sync plugins aren't needed for normal operation.
AsyncOspreyEngine + async bootstrap improvements
Add from_direct_address factory to coordinator input stream
…ow sink New AsyncPubSubPublisher: - asyncio.Queue buffer with background flush task - Batches up to 250 messages or flushes after 1s - Flush runs sync publishes in run_in_executor - Graceful shutdown flushes remaining messages - No threading locks, no gevent, no background threads Removed BQ shadow sink from osprey — it imports from discord_smite and belongs in the Discord repo (discord_smite/osprey_async_plugins).
Use direct PublisherClient in BQ shadow sink
Adds a firstresult hookspec so plugins can inject a custom BaseValidationResultExporter into OspreyEngine at bootstrap time. bootstrap_engine_with_helpers() now calls bootstrap_validation_exporter() and passes the result to OspreyEngine; falls back to NullValidationResultExporter if no plugin implements the hook.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
register_validation_exporter(config)hookspec (firstresult=True) toosprey_hooks.py— plugins can inject aBaseValidationResultExporterinto the engine at bootstrapbootstrap_validation_exporter(config)toplugin_manager.py— calls the hook, falls back toNullValidationResultExporterif not implementedbootstrap_engine_with_helpers()to callbootstrap_validation_exporter()and pass the result toOspreyEngineBackground
After migrating osprey workers to use
osprey.worker.lib.singletons.ENGINE,bootstrap_engine_with_helpers()defaulted toNullValidationResultExporter, silently dropping experiment metadata publishes. The hookspec is optional (no-op default) so existing plugins that don't implement it are unaffected.Test plan
register_validation_exporterhookimpl