Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature:Dynamic Workflow Mode Implementation for Conditional Edges #3345

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

WenPulse
Copy link

@WenPulse WenPulse commented Feb 7, 2025

Overview

Building upon the existing LangGraph framework, I developed and implemented a complete workflow in my project. I discovered that using add_conditional_edges in the workflow had certain flaws in executing subsequent nodes. By applying monkey patch, I temporarily fixed the issues related to conditional node execution within the workflow and designed an Analyzer to maintain the directed graph of execution paths. This improvement allows users to enable the new execution mode by setting workflow_mode=True during compilation. If not set or set to False, the original execution mode is used by default. I hope this approach can be adopted.


Problem Description

Prerequisite: In the workflow, each node should execute only once unless special handling logic is set.

When handling conditional branches with LangGraph, the following execution issues were discovered:

  1. Set up a selector node (A) with two executable branches, Type 1 and Type 2:

    • Type 1: Execute text spell correction (Node B).
    • Type 2: Execute both text spell correction (Node B) and text syntax correction (Node C).
  2. When the results of both types point to Node E, LangGraph's parallel processing mechanism causes Node E to execute only once.

import operator
from typing import Annotated, Any

from typing_extensions import TypedDict
from langgraph.graph import END, START, StateGraph

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    type: str


class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


builder = StateGraph(State)
builder.add_node("A", ReturnNodeValue("I'm A"))
builder.add_node("B", ReturnNodeValue("I'm B"))
builder.add_node("C", ReturnNodeValue("I'm C"))
builder.add_node("E", ReturnNodeValue("I'm E"))
intermediates = ["B", "C"]
builder.add_conditional_edges(
    "A",
    lambda state: ["B"] if state["type"] == "one" else ["B", "C"],
    path_map=intermediates,
)
builder.add_edge(START, "A")
builder.add_edge("B", "E")
builder.add_edge("C", "E")
builder.add_edge("E", END)
graph = builder.compile()
graph.invoke({"aggregate": [], "type": "two", "fanout_values": []})

pic1

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm E to ["I'm A", "I'm B", "I'm C"]
  1. However, if Node D is added after Node B, making Node D execute Node E, then for Type 2, Node E is executed twice (paths A → C → E and A → B → D → E), which does not align with the expected workflow execution.
builder.add_node("A", ReturnNodeValue("I'm A"))
builder.add_node("B", ReturnNodeValue("I'm B"))
builder.add_node("C", ReturnNodeValue("I'm C"))
builder.add_node("D", ReturnNodeValue("I'm D"))
builder.add_node("E", ReturnNodeValue("I'm E"))
intermediates = ["B", "C"]
builder.add_conditional_edges(
    "A",
    lambda state: ["B"] if state["type"] == "one" else ["B", "C"],
    path_map=intermediates,
)
builder.add_edge(START, "A")
builder.add_edge("B", "D")
builder.add_edge("D", "E")
builder.add_edge("C", "E")
builder.add_edge("E", END)
graph = builder.compile()
graph.invoke({"aggregate": [], "type": "two", "fanout_values": []})

pic2

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
Adding I'm E to ["I'm A", "I'm B", "I'm C"]
Adding I'm E to ["I'm A", "I'm B", "I'm C", "I'm D", "I'm E"]

Attempts in Existing LangGraph:

  • If Nodes C and D are set to both succeed before executing Node E, Node E can never be triggered in Type 1.
# builder.add_edge("C", "E")
builder.add_edge(["C", "D"], "E")
builder.add_edge("E", END)
graph = builder.compile()
graph.invoke({"aggregate": [], "type": "one", "fanout_values": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B"]
  • If nodes C and D are set to execute simultaneously or if node E is triggered after node C executes alone, then node E will be executed repeatedly.
builder.add_edge("C", "E")
builder.add_edge(["C", "D"], "E")
builder.add_edge("E", END)
graph = builder.compile()
graph.invoke({"aggregate": [], "type": "two", "fanout_values": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
Adding I'm E to ["I'm A", "I'm B", "I'm C"]
Adding I'm E to ["I'm A", "I'm B", "I'm C", "I'm D", "I'm E"]

Since each node is dynamically generated, specific branch paths cannot be determined before workflow execution. Therefore, it is necessary to dynamically choose the appropriate path during execution.


Solution

Added an Analyzer to LangGraph's source code to achieve the aforementioned requirements. Its main functionalities include:

  1. Path Connectivity Maintenance: In the presence of conditional nodes, use a greedy algorithm to find the maximum connectivity method for executing subsequent nodes linked to the conditional node.
  2. Trigger Condition Management: For example, Node E’s default trigger condition requires both Node C and Node D to execute successfully.
  3. Dynamic Path Adjustment: During execution, if conditional nodes are involved, the Analyzer retrieves the actual execution path of the conditional nodes and breaks the paths that cannot be executed in the directed graph, thereby dynamically adjusting Node E’s execution trigger conditions.

Execution in this Case:

  1. Type 1:
    • The selector activates the B branch, making the path A → C invalid.
    • The only valid path from the start node to E is A → B → D → E.
    • Node E’s trigger is set to D.
builder.add_conditional_edges(
    "A",
    lambda state: ["B"] if state["type"] == "one" else ["B", "C"],
    path_map=["B","C"],   
)
builder.add_edge(START, "A")
builder.add_edge("B", "D")
builder.add_edge("C", "E")
builder.add_edge("D", "E")
builder.add_edge("E", END)
graph = builder.compile(workflow_mode=True)
graph.invoke({"aggregate": [], "type": "one", "fanout_values": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B"]
Adding I'm E to ["I'm A", "I'm B", "I'm D"]
  1. Type 2:
    • The selector activates both B and C branches, making all paths valid.
    • The valid paths from the start node to E include two paths: A → C → E and A → B → D → E.
    • Node E’s trigger is set to require both C and D to execute.
def route_BC_or_B(state: State) -> Literal["B", "C"]:
    if state["type"] == "one":
        return ["B"]
    return ["B", "C"]
builder.add_conditional_edges(
    "A",
    route_BC_or_B,
)
builder.add_edge(START, "A")
builder.add_edge("B", "D")
builder.add_edge("C", "E")
builder.add_edge("D", "E")
builder.add_edge("E", END)
graph = builder.compile(workflow_mode=True)
graph.invoke({"aggregate": [], "type": "two", "fanout_values": []})
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
Adding I'm E to ["I'm A", "I'm B", "I'm C", "I'm D"]

Through these improvements, each node executes only once within the workflow. The trigger conditions for subsequent nodes are dynamically adjusted based on the actual execution paths, catering to different types of text processing requirements.


Usage

To use the improved execution mode in LangGraph, set workflow_mode=True in compile. The specific steps are as follows:

  1. Set the Option:

    Add workflow_mode=True in the compilation configuration to enable the dynamic workflow mode.

    from langgraph.graph import StateGraph
    
    builder = StateGraph(State)
    graph = builder.compile(workflow_mode=True)

    Note: When workflow_mode=True, either the path_map parameter or a Literal return is required. Only one of them is needed. The path_map or Literal must cover all possible execution paths.

  2. Default Behavior:

    If not set or if workflow_mode is set to False, the original execution mode of LangGraph is used, maintaining backward compatibility.

Copy link

vercel bot commented Feb 7, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
langgraph-docs-preview ⬜️ Ignored (Inspect) Visit Preview Feb 10, 2025 3:25am

@WenPulse WenPulse force-pushed the lwp/feature/add_workflow_mode branch from 2a4acb6 to 7a9bbe9 Compare February 7, 2025 03:14
@WenPulse WenPulse changed the title Dynamic Workflow Mode Implementation for Conditional Edges feature:Dynamic Workflow Mode Implementation for Conditional Edges Feb 7, 2025
@WenPulse WenPulse force-pushed the lwp/feature/add_workflow_mode branch from 7a9bbe9 to 98bd643 Compare February 7, 2025 08:16
@WenPulse WenPulse marked this pull request as ready for review February 7, 2025 08:18
Copy link
Contributor

@nfcampos nfcampos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this would change langgraph to be a library for DAGs, which is very much the opposite of what we want, as we started langgraph with the explicit goal of supporting graphs with cycles

@WenPulse
Copy link
Author

WenPulse commented Feb 10, 2025

Hi, this would change langgraph to be a library for DAGs, which is very much the opposite of what we want, as we started langgraph with the explicit goal of supporting graphs with cycles

@nfcampos
Thank you for your comment. I believe these two aspects do not conflict. My configuration is designed to accommodate custom workflow configuration scenarios. When workflow_mode is not set to True, all operations will continue to function as before, thereby extending the use cases of langgraph. Regarding the issue of graph cycles you mentioned, they are still supported in my project. I integrate looping functionality within subgraphs by predefining certain nodes to handle loop operations.

My goal is to provide clients with low-code workflow configuration capabilities, offering functional nodes that users can freely arrange and combine as needed, without requiring programmers to manually write specific processes.

In my practical tests, I found that only condition_edges had discrepancies with the actual execution results. To address this, I temporarily fixed the issue using a monkey patch. However, I recognize that this is not a long-term solution, which is why I submitted this PR to provide a more robust fix. For example, as shown in my previous example, I simply added a D node after the B node. This does not change the overall execution logic of the workflow, yet it results in completely different execution outcomes. Specifically, whether the E node executes once or twice depends not on the actual execution logic but on whether the number of nodes following the B and C nodes is consistent. I believe this execution behavior is not rigorous. These changes aim to provide a clearer and more reliable execution path for workflows.

In fact,in my usage scenario, both langgraph with workflow_mode and langgraph without workflow_mode coexist simultaneously. The workflow is inherently a tool for user-defined processes, functioning similarly to other tools by providing output based on given input data. The PR I submitted is solely intended to fix issues in the original langgraph during workflow execution. Ultimately, the tools are invoked within agents, where the timing and repetition of calls are managed by the agent's langgraph (langgraph without workflow_mode). These two versions of langgraph can exist concurrently and operate completely independently without interfering with each other.

@WenPulse WenPulse force-pushed the lwp/feature/add_workflow_mode branch from d37f07d to 7b725f8 Compare February 10, 2025 03:21
@WenPulse WenPulse force-pushed the lwp/feature/add_workflow_mode branch from 7b725f8 to 263bbd9 Compare February 10, 2025 03:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants