Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions osprey_worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,48 @@ build-backend = "setuptools.build_meta"
[project]
name = "osprey-worker"
version = "0.1.0"
description = "Add your description here"
description = "Osprey rules engine worker"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"flask-cors>=6.0.1",
"osprey_rpc",
# cli + web
"click>=7.1",
"flask>=1.1",
"flask-cors>=6.0",
"werkzeug>=1.0",
# engine core
"gevent>=24.2",
"greenlet>=3.0",
"pydantic>=1.10,<2",
"typing-extensions>=4.6",
"typing-inspect>=0.9",
"result>=0.5",
"deepmerge>=0.3",
"pyyaml>=6.0",
"jsonpath-rw>=1.4",
"pluggy>=1.5",
"ply>=3.11",
# engine UDFs
"python-dateutil>=2.8",
"dnspython>=2.0",
"tld>=0.12",
"unidecode>=1.3",
"python-Levenshtein>=0.12",
"mmh3>=3.0",
"phone-iso3166>=0.3",
"graphviz>=0.20",
# tracing + metrics
"ddtrace>=2.17",
"datadog>=0.51",
# networking
"grpcio>=1.49",
"requests>=2.28",
"protobuf>=4.25",
# observability
"sentry-sdk>=1.5",
]

[project.scripts]
osprey-cli = "osprey.worker.lib.cli:cli"

Expand Down
2 changes: 1 addition & 1 deletion osprey_worker/src/osprey/engine/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import gevent
import gevent.pool
from ddtrace import tracer
from ddtrace.span import Span as TracerSpan
from ddtrace.trace import Span as TracerSpan
from osprey.engine.ast.grammar import ASTNode
from osprey.engine.executor.custom_extracted_features import (
ActionIdExtractedFeature,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar, Generic, Sequence, Type, TypeVar

from ddtrace.span import Span
from ddtrace.trace import Span
from osprey.engine.ast.grammar import ASTNode

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ._base_node_executor import BaseNodeExecutor

if TYPE_CHECKING:
from ddtrace.span import Span
from ddtrace.trace import Span
from osprey.engine.ast_validator.validation_context import ValidatedSources
from osprey.engine.udf.arguments import ArgumentsBase
from osprey.engine.udf.base import UDFBase
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import functools
import sys
from typing import Any, Callable, Dict, Optional, TypeVar, Union
from typing import Any, Callable, Dict, Optional, TypeVar

import ddtrace
from ddtrace.span import Span
from ddtrace._trace.pin import Pin
from ddtrace.trace import Span
from flask import Flask
from osprey.worker.lib.ddtrace_utils.instrumentation.flask.middleware import TraceMiddleware
from osprey.worker.lib.ddtrace_utils.internal.baggage import Baggage
Expand Down Expand Up @@ -42,7 +43,7 @@ def trace(
service: Optional[str] = None,
resource: Optional[str] = None,
span_type: Optional[str] = None,
tags: Optional[Dict[Union[str, bytes], str]] = None,
tags: Optional[Dict[str, str]] = None,
) -> Span:
span = ddtrace.tracer.trace(name, service, resource, span_type) or _noop_span()
if tags:
Expand All @@ -59,7 +60,7 @@ def current_span() -> Span:


def pin_override(cluster: Any, service: Optional[str], tags: Optional[Dict[str, str]] = None) -> None:
ddtrace.Pin.override(cluster, service, tags=tags)
Pin.override(cluster, service=service, tags=tags)


def get_baggage(span: Span) -> Baggage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from typing import Any, Callable, Dict, List, Optional, Union

import flask.templating
from ddtrace import Span, Tracer
from ddtrace.constants import ERROR_MSG, ERROR_TYPE
from ddtrace.ext import SpanTypes, http
from ddtrace.internal import compat
from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.trace import Span, Tracer
from flask import Flask, g, request, signals
from osprey.worker.lib.ddtrace_utils.constants import SpanAttributes
from osprey.worker.lib.ddtrace_utils.internal.globals import baggage_manager, baggage_propagator
Expand Down Expand Up @@ -228,7 +228,9 @@ def _request_exception(self, *args: Any, **kwargs: Any) -> None:
_set_error_on_span(span, exception)

def _finish_span(self, span: Span, exception: Optional[Exception] = None) -> None:
if not span or not span.sampled:
if not span or (
span.context and span.context.sampling_priority is not None and span.context.sampling_priority <= 0
):
return

code: Union[int, str] = span.get_tag(http.STATUS_CODE) or 0
Expand Down Expand Up @@ -267,7 +269,7 @@ def _finish_span(self, span: Span, exception: Optional[Exception] = None) -> Non
span.resource = compat.to_unicode(resource).lower()

span.set_tag(http.URL, url)
span.set_tag(http.STATUS_CODE, code)
span.set_tag(http.STATUS_CODE, str(code))
span.set_tag(http.METHOD, method)

span.finish()
Expand All @@ -278,8 +280,8 @@ def _set_error_on_span(span: Span, exception: Exception) -> None:
# also get the exception from the sys.exc_info (and fill the error meta).
# Since we aren't sure it always work/for insuring no BC break, keep
# these lines which get overridden anyway.
span.set_tag(ERROR_TYPE, type(exception))
span.set_tag(ERROR_MSG, exception)
span.set_tag(ERROR_TYPE, str(type(exception).__name__))
span.set_tag(ERROR_MSG, str(exception))
# The provided `exception` object doesn't have a stack trace attached,
# so attach the stack trace with `set_traceback`.
span.set_traceback()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional

from ddtrace.filters import TraceFilter
from ddtrace.span import Span
from ddtrace.trace import Span, TraceFilter

from ..constants import BaggagePrefix

Expand Down Expand Up @@ -125,7 +124,7 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
for span in trace:
# We need a new dictionary because you can't safely mutate a dict
# while iterating over it
tags: Dict[Union[str, bytes], str] = {}
tags: Dict[str, str] = {}

for k, v in span.get_tags().items():
if isinstance(k, bytes):
Expand Down
2 changes: 1 addition & 1 deletion osprey_worker/src/osprey/worker/lib/osprey_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import gevent
import gevent.pool
from ddtrace.span import Span as TracerSpan
from ddtrace.trace import Span as TracerSpan
from gevent.threadpool import ThreadPool
from osprey.engine.ast.grammar import Assign, Span
from osprey.engine.ast.sources import Sources, SourcesConfig
Expand Down
11 changes: 6 additions & 5 deletions osprey_worker/src/osprey/worker/lib/pigeon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

import grpc
from ddtrace.constants import ERROR_MSG
from ddtrace.contrib.grpc.constants import GRPC_STATUS_CODE_KEY
from ddtrace.ext.http import STATUS_CODE
from ddtrace.span import Span
from ddtrace.trace import Span
from gevent.pool import Pool
from google.protobuf.message import Message
from grpc import Channel
Expand All @@ -28,6 +27,8 @@
from osprey.worker.lib.pigeon.skip_rate_limit import skip_rate_limit_context
from typing_extensions import TypedDict

GRPC_STATUS_CODE_KEY = 'grpc.status.code'

T = TypeVar('T')


Expand Down Expand Up @@ -567,10 +568,10 @@ def request(
finally:
http_code = _GRPC_HTTP_CODE_TRANSLATIONS[grpc_code]

span.set_tag(GRPC_STATUS_CODE_KEY, grpc_code)
span.set_tag(GRPC_STATUS_CODE_KEY, str(grpc_code))
# Setting the HTTP status code on the tag is a hack to generate more metrics from APM,
# since Datadog automatically generates metrics based on HTTP status but not based on gRPC.
span.set_tag(STATUS_CODE, http_code)
span.set_tag(STATUS_CODE, str(http_code))

if http_code >= 500:
span.error = 1
Expand Down Expand Up @@ -691,7 +692,7 @@ def maybe_start_span(name: str, service: str, resource: str) -> Span:
if existing_span.name == name:
# Add tags to the existing span to indicate the inner service and resource that would have been recorded
# if we had started a new span (otherwise we'd just lose this information).
existing_span.set_tag('pigeon.request.subsumed', True)
existing_span.set_tag('pigeon.request.subsumed', 'true')
existing_span.set_tag('pigeon.request.service', service)
existing_span.set_tag('pigeon.request.resource', resource)

Expand Down
2 changes: 1 addition & 1 deletion osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import gevent
import sentry_sdk
from ddtrace import tracer
from ddtrace.span import Span as TracerSpan
from ddtrace.trace import Span as TracerSpan
from osprey.engine.executor.execution_context import Action, ExecutionResult
from osprey.engine.executor.udf_execution_helpers import UDFHelpers
from osprey.engine.utils.types import add_slots
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ common = [
"absl-py==2.3.1",
"asttokens==3.0.0",
"blinker==1.9.0",
"bytecode==0.16.2",
"bytecode==0.17.0",
"cachetools==5.5.2",
"certifi==2025.8.3",
"charset-normalizer==3.4.2",
"click==7.1.2",
"datadog==0.51.0",
"ddtrace==2.21.11",
"ddtrace==4.5.0",
"decorator==5.2.1",
"deepmerge==0.3.0",
"dnspython==2.0.0",
"ecdsa==0.19.1",
"envier==0.5.2",
"envier==0.6.1",
"erlpack",
"executing==2.2.0",
"faker==4.18.0",
Expand Down Expand Up @@ -186,7 +186,7 @@ dev = [


[tool.uv]
override-dependencies = ["ddtrace>=2.17.2,<3.0.0"]
override-dependencies = ["ddtrace>=4.0.0"]
default-groups = ["common", "dev"]

[tool.uv.sources]
Expand Down
Loading