Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 11 additions & 49 deletions ddtrace/internal/openfeature/_exposure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"""

import time
from typing import Any
from typing import Dict
from typing import Optional

from openfeature.evaluation_context import EvaluationContext
Expand All @@ -31,70 +29,34 @@ def build_exposure_event(
Args:
flag_key: The feature flag key
variant_key: The variant key returned by the evaluation
allocation_key: The allocation key (same as variant_key in basic cases)
allocation_key: The allocation key
evaluation_context: The evaluation context with subject information
"""
# Validate required fields
if not flag_key:
logger.debug("Cannot build exposure event: flag_key is required")
return None

if variant_key is None:
variant_key = ""
if not variant_key:
logger.debug("Cannot build exposure event: variant_key is required")
return None

# Build subject from evaluation context
subject = _build_subject(evaluation_context)
if not subject:
logger.debug("Cannot build exposure event: valid subject is required")
if not allocation_key:
logger.debug("Cannot build exposure event: allocation_key is required")
return None

# Use variant_key as allocation_key if not explicitly provided
if allocation_key is None:
allocation_key = variant_key
evaluation_context = evaluation_context or EvaluationContext()

# Build the exposure event
exposure_event: ExposureEvent = {
"timestamp": int(time.time() * 1000), # milliseconds since epoch
"allocation": {"key": allocation_key},
"flag": {"key": flag_key},
"variant": {"key": variant_key},
"subject": subject,
"subject": {
"id": evaluation_context.targeting_key or "",
"attributes": evaluation_context.attributes or {},
},
}

return exposure_event


def _build_subject(evaluation_context: Optional[EvaluationContext]) -> Optional[Dict[str, Any]]:
"""
Build subject object from OpenFeature EvaluationContext.

The subject must have at minimum an 'id' field.

Args:
evaluation_context: The OpenFeature evaluation context

Returns:
Dictionary with subject information, or None if id cannot be determined
"""
if evaluation_context is None:
return None

# Get targeting_key as the subject id
subject_id = evaluation_context.targeting_key
if not subject_id:
logger.debug("evaluation_context missing targeting_key for subject.id")
return None

subject: Dict[str, Any] = {"id": subject_id}

# Add optional subject type if available in attributes
attributes = evaluation_context.attributes or {}
if "subject_type" in attributes:
subject["type"] = str(attributes["subject_type"])

# Add remaining attributes (excluding subject_type which we already handled)
remaining_attrs = {k: v for k, v in attributes.items() if k != "subject_type"}
if remaining_attrs:
subject["attributes"] = remaining_attrs

return subject
119 changes: 86 additions & 33 deletions ddtrace/internal/openfeature/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
This module handles Feature Flag configuration rules from Remote Configuration
and forwards the raw bytes to the native FFE processor.
"""

from collections import OrderedDict
from collections.abc import MutableMapping
from importlib.metadata import version
import typing

Expand Down Expand Up @@ -40,9 +43,44 @@


T = typing.TypeVar("T", covariant=True)
K = typing.TypeVar("K")
V = typing.TypeVar("V")
logger = get_logger(__name__)


class LRUCache(MutableMapping, typing.Generic[K, V]):
"""LRU cache implementation using OrderedDict that implements the Mapping interface."""

def __init__(self, maxsize: int = 128):
self._cache: typing.OrderedDict[K, V] = OrderedDict()
self._maxsize = maxsize

def __getitem__(self, key: K) -> V:
"""Get value from cache, moving it to end (most recently used)."""
self._cache.move_to_end(key)
return self._cache[key]

def __setitem__(self, key: K, value: V) -> None:
"""Put value in cache, evicting least recently used if at capacity."""
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self._maxsize:
self._cache.popitem(last=False) # Remove least recently used (first item)

def __delitem__(self, key: K) -> None:
"""Delete key from cache."""
del self._cache[key]

def __iter__(self) -> typing.Iterator[K]:
"""Iterate over cache keys."""
return iter(self._cache)

def __len__(self) -> int:
"""Return number of items in cache."""
return len(self._cache)


class DataDogProvider(AbstractProvider):
"""
Datadog OpenFeature Provider.
Expand All @@ -58,8 +96,11 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any):
self._config_received = False

# Cache for reported exposures to prevent duplicates
# Stores tuples of (flag_key, variant_key, allocation_key)
self._exposure_cache: typing.Set[typing.Tuple[str, typing.Optional[str], typing.Optional[str]]] = set()
# Stores mapping of (flag_key, subject_id) -> (allocation_key, variant_key)
# Using LRU cache with maxsize of 65536 to prevent unbounded memory growth
self._exposure_cache: LRUCache[
typing.Tuple[str, str], typing.Tuple[typing.Optional[str], typing.Optional[str]]
] = LRUCache(maxsize=65536)

# Check if experimental flagging provider is enabled
self._enabled = ffe_config.experimental_flagging_provider_enabled
Expand Down Expand Up @@ -209,13 +250,8 @@ def _resolve_details(
)

# No configuration available - return default
# Note: No exposure logging when configuration is missing
if details is None:
self._report_exposure(
flag_key=flag_key,
variant_key=None,
allocation_key=None,
evaluation_context=evaluation_context,
)
return FlagResolutionDetails(
value=default_value,
reason=Reason.DEFAULT,
Expand All @@ -229,12 +265,14 @@ def _resolve_details(

# Flag not found - return default with DEFAULT reason
if details.error_code == ffe.ErrorCode.FlagNotFound:
self._report_exposure(
flag_key=flag_key,
variant_key=None,
allocation_key=None,
evaluation_context=evaluation_context,
)
# Only report exposure if do_log is explicitly True
if details.do_log:
self._report_exposure(
flag_key=flag_key,
variant_key=None,
allocation_key=None,
evaluation_context=evaluation_context,
)
return FlagResolutionDetails(
value=default_value,
reason=Reason.DEFAULT,
Expand All @@ -252,13 +290,14 @@ def _resolve_details(
# Map native ffe.Reason to OpenFeature Reason
reason = self._map_reason_to_openfeature(details.reason)

# Report exposure event
self._report_exposure(
flag_key=flag_key,
variant_key=details.variant,
allocation_key=details.allocation_key,
evaluation_context=evaluation_context,
)
# Report exposure event only if do_log flag is True
if details.do_log:
self._report_exposure(
flag_key=flag_key,
variant_key=details.variant,
allocation_key=details.allocation_key,
evaluation_context=evaluation_context,
)

# Check if variant is None/empty to determine if we should use default value.
# For JSON flags, value can be null which is valid, so we check variant instead.
Expand Down Expand Up @@ -297,27 +336,41 @@ def _report_exposure(
Report a feature flag exposure event to the EVP proxy intake.

Uses caching to prevent duplicate exposure events for the same
(flag_key, variant_key, allocation_key) combination.
(flag_key, subject_id, variant_key, allocation_key) combination.

Note: This method should only be called when exposure logging is enabled.
Callers must check the do_log flag before invoking this method.

Args:
flag_key: The feature flag key
variant_key: The variant key returned by evaluation
allocation_key: The allocation key
evaluation_context: The evaluation context with subject information
"""
try:
# Check cache to prevent duplicate exposure events
cache_key = (flag_key, variant_key, allocation_key)
if cache_key in self._exposure_cache:
logger.debug("Skipping duplicate exposure event for %s", cache_key)
return

exposure_event = build_exposure_event(
flag_key=flag_key,
variant_key=variant_key,
allocation_key=allocation_key,
evaluation_context=evaluation_context,
)
if not exposure_event:
return

# Check cache to prevent duplicate exposure events
key = (flag_key, exposure_event["subject"]["id"])
value = (allocation_key, variant_key)

cached_value = self._exposure_cache.get(key, None)
if cached_value and cached_value == value:
logger.debug("Skipping duplicate exposure event for %s->%s", key, value)
return

writer = get_exposure_writer()
writer.enqueue(exposure_event)

if exposure_event:
writer = get_exposure_writer()
writer.enqueue(exposure_event)
# Add to cache only after successful enqueue
self._exposure_cache.add(cache_key)
# Add to cache only after successful enqueue
self._exposure_cache[key] = value
except Exception as e:
logger.debug("Failed to report exposure event: %s", e, exc_info=True)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
fixes:
- |
openfeature: Fix exposure event deduplication to use (flag_key, subject_id) as cache key
instead of (flag_key, variant_key, allocation_key). This ensures different users each
receive their own exposure event while still deduplicating repeated evaluations for the
same user. Also adds LRU eviction to prevent unbounded memory growth and respects the
do_log flag from flag metadata.
Loading
Loading