Skip to content

feat: Add state-modifying retry policies #2957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 74 additions & 16 deletions docs/docs/how-tos/node-retries.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -41,7 +41,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -78,20 +78,9 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"RetryPolicy(initial_interval=0.5, backoff_factor=2.0, max_interval=128.0, max_attempts=3, jitter=True, retry_on=<function default_retry_on at 0x78b964b89940>)"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"outputs": [],
"source": [
"from langgraph.pregel import RetryPolicy\n",
"\n",
Expand Down Expand Up @@ -131,7 +120,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -180,6 +169,75 @@
"\n",
"graph = builder.compile()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## State-Modifying Retry Policies\n",
"\n",
"Sometimes you might want to modify the state before retrying after an error occurs. LangGraph now supports this through a dictionary mapping of exceptions to state-modifying functions in the retry policy.\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Exception occured: Error occurred in processing\n",
"Counter: 0\n",
"Exception occured: Error occurred in processing\n",
"Counter: 1\n",
"Exception occured: Error occurred in processing\n",
"Counter: 2\n",
"{'counter': 3, 'result': 'Success!'}\n"
]
}
],
"source": [
"class CounterState(TypedDict):\n",
" counter: int\n",
" result: str\n",
"\n",
"\n",
"def increment_counter(state: CounterState, exception: Exception):\n",
" print(f\"Exception occured: {exception}\")\n",
" print(f\"Counter: {state['counter']}\")\n",
" state['counter'] += 1\n",
"\n",
"def processing_node(state: CounterState) -> CounterState:\n",
" if state['counter'] < 3:\n",
" raise ValueError(\"Error occurred in processing\")\n",
" state['result'] = \"Success!\"\n",
" return state\n",
"\n",
"workflow = StateGraph(CounterState)\n",
"\n",
"retry_policy = RetryPolicy(\n",
" max_attempts=5,\n",
" retry_on={ValueError: increment_counter}\n",
")\n",
"\n",
"\n",
"workflow.add_node(\"process\", processing_node, retry=retry_policy)\n",
"workflow.add_edge(START, \"process\")\n",
"workflow.add_edge(\"process\", END)\n",
"\n",
"app = workflow.compile()\n",
"final_state = app.invoke({\"counter\": 0, \"result\": \"\"})\n",
"print(final_state)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example, we define a retry policy that maps `ValueError` to the `increment_counter` function. When a `ValueError` occurs, `increment_counter` is called with the current state and the exception before the retry attempt. "
]
}
],
"metadata": {
Expand Down
10 changes: 10 additions & 0 deletions libs/langgraph/langgraph/pregel/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def run_with_retry(
elif callable(retry_policy.retry_on):
if not retry_policy.retry_on(exc): # type: ignore[call-arg]
raise
elif isinstance(retry_policy.retry_on, dict):
exception_handler = retry_policy.retry_on.get(type(exc), False)
if callable(exception_handler):
exception_handler(task.input, exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing else branch here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @nfcampos,
Thank you for your review.
I fixed it and pushed the changes.
I apologize for this oversight.

else:
raise TypeError(
"retry_on must be an Exception class, a list or tuple of Exception classes, or a callable"
Expand Down Expand Up @@ -166,6 +170,12 @@ async def arun_with_retry(
elif callable(retry_policy.retry_on):
if not retry_policy.retry_on(exc): # type: ignore[call-arg]
raise
elif isinstance(retry_policy.retry_on, dict):
exception_handler = retry_policy.retry_on.get(type(exc), False)
if callable(exception_handler):
exception_handler(task.input, exc)
else:
raise
else:
raise TypeError(
"retry_on must be an Exception class, a list or tuple of Exception classes, or a callable"
Expand Down
7 changes: 5 additions & 2 deletions libs/langgraph/langgraph/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ class RetryPolicy(NamedTuple):
jitter: bool = True
"""Whether to add random jitter to the interval between retries."""
retry_on: Union[
Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]
Type[Exception],
Sequence[Type[Exception]],
Callable[[Exception], bool],
dict[Type[Exception], Callable[[Any, Exception], None]],
] = default_retry_on
"""List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry."""
"""List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry, or a dictionary mapping exception classes to callables that can modify state before retrying. The state-modifying callables should accept (state, exception) as parameters."""


class CachePolicy(NamedTuple):
Expand Down