Skip to content
10 changes: 10 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@
# Name of deployment request routing stats method implemented by user.
REQUEST_ROUTING_STATS_METHOD = "record_routing_stats"

# Initial delay for polling outbound deployments from replicas.
RAY_SERVE_OUTBOUND_DEPLOYMENTS_INITIAL_POLL_DELAY_S = get_env_float_positive(
"RAY_SERVE_OUTBOUND_DEPLOYMENTS_INITIAL_POLL_DELAY_S", 1.0
)

# Maximum delay for polling outbound deployments from replicas (caps exponential backoff).
RAY_SERVE_OUTBOUND_DEPLOYMENTS_MAX_POLL_DELAY_S = get_env_float_positive(
"RAY_SERVE_OUTBOUND_DEPLOYMENTS_MAX_POLL_DELAY_S", 600.0
)

# By default, we run user code in a separate event loop.
# This flag can be set to 0 to run user code in the same event loop as the
# replica's main event loop.
Expand Down
163 changes: 163 additions & 0 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
RAY_SERVE_ENABLE_TASK_EVENTS,
RAY_SERVE_FAIL_ON_RANK_ERROR,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
RAY_SERVE_OUTBOUND_DEPLOYMENTS_INITIAL_POLL_DELAY_S,
RAY_SERVE_OUTBOUND_DEPLOYMENTS_MAX_POLL_DELAY_S,
RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
SERVE_LOGGER_NAME,
Expand Down Expand Up @@ -280,6 +282,11 @@ def __init__(
self._last_record_routing_stats_time: float = 0.0
self._ingress: bool = False

# Outbound deployments polling state
self._outbound_deployments: Optional[List[DeploymentID]] = None
self._outbound_deployments_ref: Optional[ObjectRef] = None
self._last_outbound_deployments_poll_time: float = 0.0

@property
def replica_id(self) -> str:
return self._replica_id
Expand Down Expand Up @@ -938,6 +945,68 @@ def _should_record_routing_stats(self) -> bool:
)
return time_since_last > randomized_period

def _should_poll_outbound_deployments(self, poll_period_s: float) -> bool:
"""Determine if a new outbound deployments poll should be kicked off.

A poll will be started if:
1) There's not already an active poll.
2) It has been more than poll_period_s since the previous poll was *started*.

This assumes that self._outbound_deployments_ref is reset to `None`
when an active poll succeeds or fails.

Args:
poll_period_s: The period between polls in seconds.

Returns:
True if a new poll should be kicked off, False otherwise.
"""
if self._outbound_deployments_ref is not None:
# There's already an active poll.
return False

time_since_last = time.time() - self._last_outbound_deployments_poll_time
return time_since_last >= poll_period_s

def poll_outbound_deployments(
self, poll_period_s: float
) -> Optional[List[DeploymentID]]:
"""Poll the replica for its outbound deployments.

Args:
poll_period_s: The period between polls in seconds.

Returns:
The cached outbound deployments if available, None otherwise.
"""
# Check if there's a pending poll result
if self._outbound_deployments_ref is not None:
if check_obj_ref_ready_nowait(self._outbound_deployments_ref):
try:
self._outbound_deployments = ray.get(self._outbound_deployments_ref)
except Exception:
logger.exception(
f"Exception when trying to get outbound deployments from {self._replica_id}:\n"
+ traceback.format_exc()
)
finally:
self._outbound_deployments_ref = None

# Initiate a new poll if needed
if self._should_poll_outbound_deployments(poll_period_s):
self._last_outbound_deployments_poll_time = time.time()
try:
self._outbound_deployments_ref = (
self._actor_handle.list_outbound_deployments.remote()
)
except Exception:
logger.exception(
f"Failed to initiate outbound deployments poll for {self._replica_id}"
)
self._outbound_deployments_ref = None

return self._outbound_deployments

def check_health(self) -> bool:
"""Check if the actor is healthy.

Expand Down Expand Up @@ -1289,6 +1358,19 @@ def pull_routing_stats(self) -> Optional[Dict[str, Any]]:
"""
return self._actor.get_routing_stats()

def poll_outbound_deployments(
self, poll_period_s: float
) -> Optional[List[DeploymentID]]:
"""Poll the replica for its outbound deployments.

Args:
poll_period_s: The period between polls in seconds.

Returns:
The cached outbound deployments if available, None otherwise.
"""
return self._actor.poll_outbound_deployments(poll_period_s)

def update_state(self, state: ReplicaState) -> None:
"""Updates state in actor details."""
self.update_actor_details(state=state)
Expand Down Expand Up @@ -1785,6 +1867,15 @@ def __init__(
self._docs_path: Optional[str] = None
self._route_patterns: Optional[List[str]] = None

# Outbound deployments polling state
self._outbound_deployments_cache: Optional[Set[DeploymentID]] = None
self._outbound_poll_delay: float = (
RAY_SERVE_OUTBOUND_DEPLOYMENTS_INITIAL_POLL_DELAY_S
)
self._max_outbound_poll_delay: float = (
RAY_SERVE_OUTBOUND_DEPLOYMENTS_MAX_POLL_DELAY_S
)

def should_autoscale(self) -> bool:
"""
Check if the deployment is under autoscaling
Expand Down Expand Up @@ -2162,6 +2253,10 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool:
self._curr_status_info = self._curr_status_info.handle_transition(
trigger=DeploymentStatusInternalTrigger.CONFIG_UPDATE
)
# Reset outbound deployments poll delay to quickly poll the new version
self._outbound_poll_delay = (
RAY_SERVE_OUTBOUND_DEPLOYMENTS_INITIAL_POLL_DELAY_S
)

logger.info(
f"Deploying new version of {self._id} "
Expand Down Expand Up @@ -2658,6 +2753,9 @@ def check_and_update_replicas(self):
transition happened.
"""

# Poll outbound deployments from a random replica with exponential backoff
self.poll_outbound_deployments_if_needed()

for replica in self._replicas.pop(
states=[ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION]
):
Expand Down Expand Up @@ -2969,6 +3067,53 @@ def _stop_one_running_replica_for_testing(self):
def is_ingress(self) -> bool:
return self._target_state.info.ingress

def poll_outbound_deployments_if_needed(self) -> None:
"""Poll all RUNNING replicas for their outbound deployments.

Uses exponential backoff for polling frequency, capping at
RAY_SERVE_OUTBOUND_DEPLOYMENTS_MAX_POLL_DELAY_S.
"""
running_replicas = self._replicas.get([ReplicaState.RUNNING])
if not running_replicas:
return

result = set()
for replica in running_replicas:
outbound_deployments = replica.poll_outbound_deployments(
self._outbound_poll_delay
)
if outbound_deployments is not None:
result.update(outbound_deployments)
if not result:
return

if not self._outbound_deployments_cache:
self._outbound_deployments_cache = result
else:
# Union the new outbound deployments with the cached set to ensure we don't miss
# deployments that may have been added dynamically (e.g., created at runtime in
# conditional branches or after initialization).
self._outbound_deployments_cache = self._outbound_deployments_cache.union(
result
)

# If the deployment is healthy, increase the poll delay.
if self.curr_status_info.status == DeploymentStatus.HEALTHY:
self._outbound_poll_delay = min(
self._outbound_poll_delay * 2, self._max_outbound_poll_delay
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Exponential Backoff Fails for Non-Healthy Polls Improve

The exponential backoff for outbound deployments polling only increases the delay when the deployment status is HEALTHY. This means that during deployment updates, rollouts, or any non-HEALTHY states (UPDATING, UPSCALING, DOWNSCALING, etc.), the poll delay will never increase and replicas will be polled at the initial 1-second interval indefinitely. This defeats the purpose of exponential backoff and can cause excessive polling during long-running deployment operations. The backoff should increase based on successful polls, not deployment health status.

Fix in Cursor Fix in Web


def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
"""Get the cached outbound deployments.

Returns:
List of deployment IDs that this deployment calls, or None if
not yet polled.
"""
if self._outbound_deployments_cache is None:
return None
return sorted(self._outbound_deployments_cache)


class DeploymentStateManager:
"""Manages all state for deployments in the system.
Expand Down Expand Up @@ -3577,3 +3722,21 @@ def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, i
return {}

return deployment_state._get_replica_ranks_mapping()

def get_deployment_outbound_deployments(
self, deployment_id: DeploymentID
) -> Optional[List[DeploymentID]]:
"""Get the cached outbound deployments for a specific deployment.

Args:
deployment_id: The deployment ID to get outbound deployments for.

Returns:
List of deployment IDs that this deployment calls, or None if
the deployment doesn't exist or hasn't been polled yet.
"""
deployment_state = self._deployment_states.get(deployment_id)
if deployment_state is None:
return None

return deployment_state.get_outbound_deployments()
Loading