diff --git a/python/beeai_framework/workflows/v2/decorators/_and.py b/python/beeai_framework/workflows/v2/decorators/_and.py new file mode 100644 index 000000000..e1710f7f8 --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/_and.py @@ -0,0 +1,16 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet + + +def _and(*methods: AsyncMethod | str) -> AsyncMethodSet: + asm = AsyncMethodSet() + + for method in methods: + if isinstance(method, str): + asm.methods.append(method) + elif callable(method): + asm.methods.append(method.__name__) + + return asm diff --git a/python/beeai_framework/workflows/v2/decorators/_or.py b/python/beeai_framework/workflows/v2/decorators/_or.py new file mode 100644 index 000000000..157d5dda5 --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/_or.py @@ -0,0 +1,17 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet + + +def _or(*methods: AsyncMethod | str) -> AsyncMethodSet: + asm = AsyncMethodSet() + + for method in methods: + if isinstance(method, str): + asm.methods.append(method) + elif callable(method): + asm.methods.append(method.__name__) + + asm.condition = "or" + return asm diff --git a/python/beeai_framework/workflows/v2/decorators/after.py b/python/beeai_framework/workflows/v2/decorators/after.py new file mode 100644 index 000000000..0290c4d87 --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/after.py @@ -0,0 +1,15 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Callable + +from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet + + +def after(dependency: str | AsyncMethod | AsyncMethodSet) -> Callable[[AsyncMethod], AsyncMethod]: + def decorator(func: AsyncMethod) -> AsyncMethod: + func._is_step = True # type: ignore[attr-defined] + func._dependency = dependency # type: ignore[attr-defined] + return func + + return decorator diff --git a/python/beeai_framework/workflows/v2/decorators/end.py b/python/beeai_framework/workflows/v2/decorators/end.py new file mode 100644 index 000000000..ce399a17c --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/end.py @@ -0,0 +1,20 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + + +from collections.abc import Callable, Coroutine +from typing import Any, Concatenate, ParamSpec, TypeVar + +from beeai_framework.backend.message import AnyMessage +from beeai_framework.workflows.v2.workflow import Workflow + +S = TypeVar("S", bound=Workflow) +P = ParamSpec("P") + +EndMethod = Callable[Concatenate[S, P], Coroutine[Any, Any, list[AnyMessage]]] + + +def end(func: EndMethod[S, P]) -> EndMethod[S, P]: + func._is_step = True # type: ignore[attr-defined] + func._is_end = True # type: ignore[attr-defined] + return func diff --git a/python/beeai_framework/workflows/v2/decorators/fork.py b/python/beeai_framework/workflows/v2/decorators/fork.py new file mode 100644 index 000000000..99dfd7093 --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/fork.py @@ -0,0 +1,9 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from beeai_framework.workflows.v2.types import AsyncMethod + + +def fork(func: AsyncMethod) -> AsyncMethod: + func._is_fork = True # type: ignore[attr-defined] + return func diff --git a/python/beeai_framework/workflows/v2/decorators/join.py b/python/beeai_framework/workflows/v2/decorators/join.py new file mode 100644 index 000000000..a298519df --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/join.py @@ -0,0 +1,9 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from beeai_framework.workflows.v2.types import AsyncMethod + + +def join(func: AsyncMethod) -> AsyncMethod: + func._is_join = True # type: ignore[attr-defined] + return func diff --git a/python/beeai_framework/workflows/v2/decorators/retry.py b/python/beeai_framework/workflows/v2/decorators/retry.py new file mode 100644 index 000000000..d367669ae --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/retry.py @@ -0,0 +1,15 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + + +from collections.abc import Callable + +from beeai_framework.workflows.v2.types import AsyncMethod + + +def retry(n: int = 1) -> Callable[[AsyncMethod], AsyncMethod]: + def decorator(func: AsyncMethod) -> AsyncMethod: + func._retries = n # type: ignore[attr-defined] + return func + + return decorator diff --git a/python/beeai_framework/workflows/v2/decorators/start.py b/python/beeai_framework/workflows/v2/decorators/start.py new file mode 100644 index 000000000..4f0c6e51c --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/start.py @@ -0,0 +1,19 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + + +from collections.abc import Callable, Coroutine +from typing import Any, TypeVar + +from beeai_framework.backend.message import AnyMessage +from beeai_framework.workflows.v2.workflow import Workflow + +S = TypeVar("S", bound=Workflow) +AnyCoroutine = Coroutine[Any, Any, Any] +StartMethod = Callable[[S, list[AnyMessage]], AnyCoroutine] + + +def start(func: StartMethod[S]) -> StartMethod[S]: + func._is_step = True # type: ignore[attr-defined] + func._is_start = True # type: ignore[attr-defined] + return func diff --git a/python/beeai_framework/workflows/v2/decorators/when.py b/python/beeai_framework/workflows/v2/decorators/when.py new file mode 100644 index 000000000..a76d729b8 --- /dev/null +++ b/python/beeai_framework/workflows/v2/decorators/when.py @@ -0,0 +1,20 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Callable + +from beeai_framework.workflows.v2.types import AsyncMethod + +Predicate = Callable[..., bool] + + +def when(predicate: Predicate) -> Callable[[AsyncMethod], AsyncMethod]: + """ + Async decorator: runs the async function only if `predicate` returns True. + """ + + def decorator(func: AsyncMethod) -> AsyncMethod: + func._when_predicate = predicate # type: ignore[attr-defined] + return func + + return decorator diff --git a/python/beeai_framework/workflows/v2/events.py b/python/beeai_framework/workflows/v2/events.py new file mode 100644 index 000000000..95657fe57 --- /dev/null +++ b/python/beeai_framework/workflows/v2/events.py @@ -0,0 +1,40 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import BaseModel, ConfigDict + +from beeai_framework.workflows.v2.step import WorkflowStep + + +class WorkflowStartEvent(BaseModel): ... + + +class WorkflowStepEvent(BaseModel): + step: WorkflowStep + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class WorkflowStartStepEvent(WorkflowStepEvent): + pass + + +class WorkflowErrorEvent(WorkflowStepEvent): + error: Exception + attempt: int + + +class WorkflowRetryStepEvent(WorkflowStepEvent): + error: Exception + attempt: int + + +class WorkflowSuccessEvent(BaseModel): ... + + +workflow_v2_event_types: dict[str, type] = { + "start": WorkflowStartEvent, + "start_step": WorkflowStartStepEvent, + "error": WorkflowErrorEvent, + "retry_step": WorkflowRetryStepEvent, + "success": WorkflowSuccessEvent, +} diff --git a/python/beeai_framework/workflows/v2/step.py b/python/beeai_framework/workflows/v2/step.py new file mode 100644 index 000000000..17c7dfbca --- /dev/null +++ b/python/beeai_framework/workflows/v2/step.py @@ -0,0 +1,115 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict + +from beeai_framework.workflows.v2.decorators.when import Predicate +from beeai_framework.workflows.v2.types import AsyncMethod, ExecutionCondition + + +class WorkflowStepExecution(BaseModel): + inputs: list[Any] = [] + output: Any | None + error: Exception | None + started_at: datetime | None + ended_at: datetime | None + duration: float + + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class WorkflowStep: + def __init__( + self, + func: AsyncMethod, + start: bool = False, + end: bool = False, + fork: bool = False, + join: bool = False, + retries: int = 0, + ) -> None: + self._func = func + self._name = func.__name__ + + self._is_start = start + self._is_end = end + self._is_fork = fork + self._is_join = join + self._retries = retries + + self._condition: ExecutionCondition = "and" + self.forked: list[WorkflowStep] = [] + self._dependencies: list[WorkflowStep] = [] + self._dependents: list[WorkflowStep] = [] + self.completed_dependencies: list[WorkflowStep] = [] + self.inputs: dict[int, Any] = {} + + self.completed_event = asyncio.Event() + self._predicates: list[Predicate] = [] + self._executions: list[WorkflowStepExecution] = [] + + @property + def name(self) -> str: + return self._name + + @property + def func(self) -> AsyncMethod: + return self._func + + @property + def is_start(self) -> bool: + return self._is_start + + @property + def is_end(self) -> bool: + return self._is_end + + @property + def is_fork(self) -> bool: + return self._is_fork + + @property + def is_join(self) -> bool: + return self._is_join + + @property + def retries(self) -> int: + return self._retries + + @property + def condition(self) -> ExecutionCondition: + return self._condition + + @condition.setter + def condition(self, value: ExecutionCondition) -> None: + self._condition = value + + def add_predicate(self, predicate: Predicate) -> None: + self._predicates.append(predicate) + + @property + def predicates(self) -> list[Predicate]: + return self._predicates + + @property + def executions(self) -> list[WorkflowStepExecution]: + return self._executions + + def add_dependency(self, dep: "WorkflowStep") -> None: + self._dependencies.append(dep) + dep._dependents.append(self) + + @property + def dependencies(self) -> list["WorkflowStep"]: + return self._dependencies + + @property + def dependents(self) -> list["WorkflowStep"]: + return self._dependents + + def last_execution(self) -> WorkflowStepExecution | None: + return self.executions[-1] if self.executions else None diff --git a/python/beeai_framework/workflows/v2/types.py b/python/beeai_framework/workflows/v2/types.py new file mode 100644 index 000000000..1a2097a05 --- /dev/null +++ b/python/beeai_framework/workflows/v2/types.py @@ -0,0 +1,16 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Callable, Coroutine +from typing import Any, Literal + +from pydantic import BaseModel + +AsyncMethod = Callable[..., Coroutine[Any, Any, Any]] + +ExecutionCondition = Literal["and", "or"] + + +class AsyncMethodSet(BaseModel): + methods: list[str] = [] + condition: ExecutionCondition = "and" diff --git a/python/beeai_framework/workflows/v2/util.py b/python/beeai_framework/workflows/v2/util.py new file mode 100644 index 000000000..8eede6b96 --- /dev/null +++ b/python/beeai_framework/workflows/v2/util.py @@ -0,0 +1,82 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import inspect +from collections.abc import Awaitable, Callable +from typing import Any + + +def prepare_args(func: Callable[..., Any], self: Any, *inputs: Any) -> tuple[Any, ...]: + """ + Prepares positional args for a callable that may or may not expect `self`. + If `func` is already bound (e.g., obj.method), don't include `self`. + """ + sig = inspect.signature(func) + params = list(sig.parameters.keys()) + + if inspect.ismethod(func) or "self" not in params: + # already bound or doesn't expect self + return (*inputs,) + else: + # expects self explicitly + return (self, *inputs) + + +async def run_callable( + func: Callable[..., Any] | Callable[..., Awaitable[Any]], + *args: Any, + **kwargs: Any, +) -> Any: + """ + Dynamically inspects a callable (sync or async) and calls it with + only the arguments it accepts. + """ + sig = inspect.signature(func) + params = list(sig.parameters.values()) + + # Determine if the callable is bound + is_bound = hasattr(func, "__self__") and func.__self__ is not None + skip_first = False + + if params: + first_param = params[0] + if is_bound and first_param.name in ("self", "cls"): + skip_first = True + + filtered_args = [] + filtered_kwargs = {} + + arg_index = 0 + for i, param in enumerate(params): + if skip_first and i == 0: + continue + + if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): + if arg_index < len(args): + filtered_args.append(args[arg_index]) + arg_index += 1 + elif param.kind == inspect.Parameter.VAR_POSITIONAL: + filtered_args.extend(args[arg_index:]) + arg_index = len(args) + break + elif param.kind == inspect.Parameter.KEYWORD_ONLY or param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: + if param.name in kwargs: + filtered_kwargs[param.name] = kwargs[param.name] + elif param.kind == inspect.Parameter.VAR_KEYWORD: + filtered_kwargs = kwargs + break + + # Bind safely + try: + bound = sig.bind_partial(*filtered_args, **filtered_kwargs) + bound.apply_defaults() + except TypeError: + bound = sig.bind_partial(**filtered_kwargs) + + # Call + if inspect.iscoroutinefunction(func): + result = await func(*bound.args, **bound.kwargs) + else: + result = func(*bound.args, **bound.kwargs) + + return result diff --git a/python/beeai_framework/workflows/v2/workflow.py b/python/beeai_framework/workflows/v2/workflow.py new file mode 100644 index 000000000..36f0abc82 --- /dev/null +++ b/python/beeai_framework/workflows/v2/workflow.py @@ -0,0 +1,282 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import inspect +import time +from datetime import UTC, datetime +from functools import cached_property +from itertools import zip_longest +from pathlib import Path +from typing import Any, Unpack + +from beeai_framework.backend.message import AnyMessage +from beeai_framework.context import RunContext, RunMiddlewareType +from beeai_framework.emitter.emitter import Emitter +from beeai_framework.runnable import Runnable, RunnableOptions, RunnableOutput, runnable_entry +from beeai_framework.workflows.v2.events import ( + WorkflowStartEvent, + WorkflowStartStepEvent, + workflow_v2_event_types, +) +from beeai_framework.workflows.v2.step import WorkflowStep, WorkflowStepExecution +from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet +from beeai_framework.workflows.v2.util import prepare_args, run_callable + + +class Workflow(Runnable[RunnableOutput]): + def __init__(self, middlewares: list[RunMiddlewareType] | None = None) -> None: + super().__init__(middlewares=middlewares) + self._input: list[AnyMessage] = [] + self._output: list[AnyMessage] = [] + self._steps: dict[str, WorkflowStep] = {} + self._start_step: WorkflowStep | None = None + self._end_steps: set[WorkflowStep] = set() + self._running_tasks: set[asyncio.Task[Any]] = set() + self._queue: asyncio.Queue[WorkflowStep] = asyncio.Queue() + self._scan() + + def print_html(self, path: Path | str | None = None) -> None: + def to_mermaid(direction: str = "TD") -> list[str]: + lines = [f"flowchart-elk {direction}"] + visited = set() + + def dfs(step: WorkflowStep) -> None: + if step in visited: + return + visited.add(step) + dependents = step.dependents + for dep in dependents: + lines.append(f"\t_{step.name}({step.name}) --> _{dep.name}({dep.name})") + dfs(dep) + if not dependents: + lines.append(f"\t_{step.name}({step.name})") + + if self._start_step: + dfs(self._start_step) + + return lines + + mermaid_code_list = to_mermaid() + mermaid_code_list.append("classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c") + mermaid_code_list.append("classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40") + + if self._start_step: + mermaid_code_list.append(f"class _{self._start_step.name} _cls_start") + + for step in self._end_steps: + mermaid_code_list.append(f"class _{step.name} _cls_end") + + mermaid_code = "\n".join(mermaid_code_list) + default_filename = f"{self.__class__.__name__.lower()}.html" + + # If no path provided, write next to current module file + if path is None: + file_path = Path(__file__).parent / default_filename + else: + path = Path(path) + # If path is a directory, append default filename + file_path = path / default_filename if path.is_dir() or not path.suffix else path + + file_path.parent.mkdir(parents=True, exist_ok=True) + + html_template = f""" + + + + + Mermaid Diagram + + + +
+
+                        {mermaid_code}
+                    
+
+ + + + + """ + + file_path.write_text(html_template, encoding="utf-8") + + def inspect(self, step: AsyncMethod | str) -> WorkflowStep: + key = step if isinstance(step, str) else step.__name__ + return self._steps[key] + + def _scan(self) -> None: + """Scan decorated methods to build execution graph""" + methods = inspect.getmembers(self, inspect.ismethod) + + # Create all steps first + for name, method in methods: + if hasattr(method, "_is_step"): + is_start = hasattr(method, "_is_start") and method._is_start + is_end = hasattr(method, "_is_end") and method._is_end + is_fork = hasattr(method, "_is_fork") and method._is_fork + is_join = hasattr(method, "_is_join") and method._is_join + retries = method._retries if hasattr(method, "_retries") else 0 + + step = WorkflowStep(method, start=is_start, end=is_end, fork=is_fork, join=is_join, retries=retries) + self._steps[name] = step + + if step.is_start: + self._start_step = step + + if step.is_end: + self._end_steps.add(step) + + # TODO multiple predicate decorators + if hasattr(method, "_when_predicate"): + step.add_predicate(method._when_predicate) + + # Once all steps have been created build dependency graph + for name, method in methods: + if hasattr(method, "_dependency"): + dependency = method._dependency + if isinstance(dependency, str): + m = dict(methods).get(dependency) + if m is not None: + self._steps[name].add_dependency(self._steps[m.__name__]) + elif isinstance(dependency, AsyncMethodSet): + for method_name in dependency.methods: + m = dict(methods).get(method_name) + if m is not None: + self._steps[name].add_dependency(self._steps[m.__name__]) + self._steps[name].condition = dependency.condition + elif callable(dependency): + self._steps[name].add_dependency(self._steps[dependency.__name__]) + + async def _run(self) -> None: + assert self._start_step is not None + await self._queue.put(self._start_step) + + while not self._queue.empty() or self._running_tasks: + # Start all tasks in queue concurrently + while not self._queue.empty(): + step = await self._queue.get() + task = asyncio.create_task(self._run_step(step)) + self._running_tasks.add(task) + task.add_done_callback(self._running_tasks.discard) + + if self._running_tasks: + # Wait until any task completes + done, _ = await asyncio.wait(self._running_tasks, return_when=asyncio.FIRST_COMPLETED) + + # Raise exceptions for any finished tasks + for task in done: + exception = task.exception() + if exception: + raise exception # re-raise the exception from the task + + async def _run_step(self, step: WorkflowStep) -> None: + step_inputs: list[Any] = ( + [self._input] if step.is_start else [step.inputs[k] for k in sorted(step.inputs.keys())] + ) + + for p in step.predicates: + args = prepare_args(p, self, *step_inputs) + p_res = await run_callable(p, *args) + if not p_res: + return + + started_at = datetime.now(UTC) + start_perf = time.perf_counter() + + await RunContext.get().emitter.emit( + "start_step", + WorkflowStartStepEvent(step=step), + ) + + if step.is_start: + result = await run_callable(step.func, *step_inputs) + else: + if step.is_fork: + safe_params = [i if i is not None else [] for i in step_inputs] + params = list(zip_longest(*safe_params, fillvalue=None)) + tasks = [run_callable(step.func, *p) for p in params] + # TODO Retry forked + result = await asyncio.gather(*tasks) + elif step.is_join: + # Include the inputs to the original fork + fork_inputs = [[d.inputs[k] for k in sorted(d.inputs.keys())] for d in step.dependencies] + flat_fork_input = [item for sublist in fork_inputs for item in sublist] + inputs = flat_fork_input + step_inputs + result = await run_callable(step.func, *inputs) + else: + result = await run_callable(step.func, *step_inputs) + + step.executions.append( + WorkflowStepExecution( + inputs=step_inputs, + output=result, + error=None, + started_at=started_at, + ended_at=datetime.now(UTC), + duration=time.perf_counter() - start_perf, + ) + ) + + if step._is_end: + self._output = result or [] + + # Enqueue dependents (that are waiting on the completion of this step) + for dep in step.dependents: + # Insert this result into the inputs at correct index + idx = dep.dependencies.index(step) + + # Set the input at the correct dependency index + if dep.condition == "and": + dep.inputs[idx] = result + else: + dep.inputs[0] = result + + if dep.condition == "and": + dep.completed_dependencies.append(step) + if set(dep.completed_dependencies) == set(dep.dependencies): + await self._queue.put(dep) # All dependencies are done, queue dependent + elif dep.condition == "or": + await self._queue.put(dep) # Can queue immediately for OR + + def _create_emitter(self) -> Emitter: + return Emitter.root().child(namespace=["workflow", "v2"], creator=self, events=workflow_v2_event_types) + + @cached_property + def emitter(self) -> Emitter: + return self._create_emitter() + + @runnable_entry + async def run(self, input: list[AnyMessage], /, **kwargs: Unpack[RunnableOptions]) -> RunnableOutput: + run_context = RunContext.get() + + await run_context.emitter.emit( + "start", + WorkflowStartEvent(), + ) + + self._input = input + await self._run() + return RunnableOutput(output=self._output) diff --git a/python/beeai_framework/workflows/v3/events.py b/python/beeai_framework/workflows/v3/events.py new file mode 100644 index 000000000..d3467cf4b --- /dev/null +++ b/python/beeai_framework/workflows/v3/events.py @@ -0,0 +1,12 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import BaseModel + + +class WorkflowStartEvent(BaseModel): ... + + +workflow_v3_event_types: dict[str, type] = { + "start": WorkflowStartEvent, +} diff --git a/python/beeai_framework/workflows/v3/step.py b/python/beeai_framework/workflows/v3/step.py new file mode 100644 index 000000000..0bf289b57 --- /dev/null +++ b/python/beeai_framework/workflows/v3/step.py @@ -0,0 +1,198 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Literal + +from beeai_framework.workflows.v3.types import ( + AsyncStepFunction, + BranchCondition, + ControllerFunction, + StepLoopCondition, +) +from beeai_framework.workflows.v3.util import run_callable + + +class WorkflowBuilder: + def __init__(self, root: WorkflowStep) -> None: + self._root = root + self._current_step = root + self._steps: set[WorkflowStep] = {self._current_step} + + def then(self, next_steps: WorkflowStep | list[WorkflowStep]) -> WorkflowBuilder: + if isinstance(next_steps, list): + join_step = JoinWorkflowStep() + + for nxt in next_steps: + self._current_step.add_downstream_step(nxt) + nxt.add_upstream_step(self._current_step) + + nxt.add_downstream_step(join_step) + join_step.add_upstream_step(nxt, edge_type="and") + + self._steps.add(nxt) + + self._current_step = join_step + return self + else: + next_steps = [next_steps] + # for prev in self._frontier: + for nxt in next_steps: + self._current_step.add_downstream_step(nxt) + nxt.add_upstream_step(self._current_step) + + self._steps.add(nxt) + + self._current_step = nxt + return self + + def branch( + self, steps: dict[Any, WorkflowStep], branch_fn: ControllerFunction | None = None + ) -> AfterWorkflowBuilder: + # for prev in self._frontier: + # join_step = JoinWorkflowStep() + for key, nxt in steps.items(): + self._current_step.add_downstream_step(nxt, BranchCondition(fn=branch_fn, key=key)) + nxt.add_upstream_step(self._current_step) + self._steps.add(nxt) + # nxt.add_downstream_step(join_step, optional=True) + # join_step.add_upstream_step(nxt, optional=True) + return AfterWorkflowBuilder(builder=self) + + def after(self, step: WorkflowStep) -> WorkflowBuilder: + assert step in self._steps + self._current_step = step + return self + + # TODO + # def loop_until( + # self, + # step: WorkflowStep, + # until_fn: BooleanControllerFunction, + # ) -> WorkflowBuilder: + # step.loop_condition = StepLoopCondition( + # fn=until_fn, + # ) + + # for prev in self._frontier: + # prev.add_downstream_step(step) + # step.add_upstream_step(prev) + + # return WorkflowBuilder([step]) + + +class AfterWorkflowBuilder: + def __init__(self, builder: WorkflowBuilder) -> None: + self._builder = builder + + def after(self, step: WorkflowStep) -> WorkflowBuilder: + return self._builder.after(step) + + +class WorkflowEdge: + def __init__( + self, + source: WorkflowStep, + target: WorkflowStep, + condition: BranchCondition | None = None, + type: Literal["and", "or"] = "or", + ) -> None: + self.source = source + self.target = target + self.condition = condition + self.type: Literal["and", "or"] = type + + +class WorkflowStep(ABC): + def __init__(self) -> None: + super().__init__() + self._upstream_edges: list[WorkflowEdge] = [] + self._downstream_edges: list[WorkflowEdge] = [] + self.has_executed: bool = False + self.loop_condition: StepLoopCondition | None = None + + def add_upstream_step(self, step: WorkflowStep, edge_type: Literal["and", "or"] = "or") -> None: + self._upstream_edges.append(WorkflowEdge(step, self, type=edge_type)) + + def add_downstream_step( + self, step: WorkflowStep, condition: BranchCondition | None = None, edge_type: Literal["and", "or"] = "or" + ) -> None: + self._downstream_edges.append(WorkflowEdge(self, step, condition=condition, type=edge_type)) + + @property + def upstream_steps(self) -> list[WorkflowStep]: + return [edge.source for edge in self._upstream_edges] + + @property + def downstream_steps(self) -> list[WorkflowStep]: + return [edge.target for edge in self._downstream_edges] + + @property + @abstractmethod + def name(self) -> str: + pass + + @abstractmethod + async def execute(self, *inputs: Any) -> Any: + pass + + @property + def result(self) -> Any: + return None + + async def requeue(self, *inputs: Any) -> bool: + if self.loop_condition: + return bool(await run_callable(self.loop_condition.fn, *inputs)) + return False + + +class JoinWorkflowStep(WorkflowStep): + @property + def name(self) -> str: + return "__join__" + + async def execute(self, *inputs: Any) -> Any: + pass + + @property + def result(self) -> Any: + res = [] + for up in self.upstream_steps: + if up.has_executed: + res.append(up.result) + return res + + +class FuncWorkflowStep(WorkflowStep): + """ + Executes an async function/method. + """ + + def __init__( + self, + func: AsyncStepFunction | None = None, + ) -> None: + super().__init__() + self._func = func + self._result: Any = None + + @property + def name(self) -> str: + if self._func: + return self._func.__name__ + return "" + + async def execute(self, *inputs: Any) -> Any: + if self._func: + self._result = await run_callable(self._func, *inputs) + return self._result + + @property + def result(self) -> Any: + return self._result + + +class StartWorkflowStep(FuncWorkflowStep): + pass diff --git a/python/beeai_framework/workflows/v3/types.py b/python/beeai_framework/workflows/v3/types.py new file mode 100644 index 000000000..4fa6adfff --- /dev/null +++ b/python/beeai_framework/workflows/v3/types.py @@ -0,0 +1,26 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Awaitable, Callable, Coroutine +from typing import Any + +from pydantic import BaseModel + +from beeai_framework.runnable import RunnableOutput + +ControllerFunction = Callable[..., Awaitable[Any]] +BooleanControllerFunction = Callable[..., Awaitable[bool]] +AsyncStepFunction = Callable[..., Coroutine[Any, Any, Any]] + + +class BranchCondition(BaseModel): + fn: ControllerFunction | None = None + key: Any + + +class StepLoopCondition(BaseModel): + fn: BooleanControllerFunction + + +# # End of workflow +EndStepMethod = Callable[..., Coroutine[Any, Any, RunnableOutput]] diff --git a/python/beeai_framework/workflows/v3/util.py b/python/beeai_framework/workflows/v3/util.py new file mode 100644 index 000000000..7c123a822 --- /dev/null +++ b/python/beeai_framework/workflows/v3/util.py @@ -0,0 +1,66 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import inspect +from collections.abc import Awaitable, Callable +from typing import Any + + +async def run_callable( + func: Callable[..., Any] | Callable[..., Awaitable[Any]], + *args: Any, + **kwargs: Any, +) -> Any: + """ + Dynamically inspects a callable (sync or async) and calls it with + only the arguments it accepts. + """ + sig = inspect.signature(func) + params = list(sig.parameters.values()) + + # Determine if the callable is bound + is_bound = hasattr(func, "__self__") and func.__self__ is not None + skip_first = False + + if params: + first_param = params[0] + if is_bound and first_param.name in ("self", "cls"): + skip_first = True + + filtered_args = [] + filtered_kwargs = {} + + arg_index = 0 + for i, param in enumerate(params): + if skip_first and i == 0: + continue + + if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): + if arg_index < len(args): + filtered_args.append(args[arg_index]) + arg_index += 1 + elif param.kind == inspect.Parameter.VAR_POSITIONAL: + filtered_args.extend(args[arg_index:]) + arg_index = len(args) + break + elif param.kind == inspect.Parameter.KEYWORD_ONLY or param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: + if param.name in kwargs: + filtered_kwargs[param.name] = kwargs[param.name] + elif param.kind == inspect.Parameter.VAR_KEYWORD: + filtered_kwargs = kwargs + break + + # Bind safely + try: + bound = sig.bind_partial(*filtered_args, **filtered_kwargs) + bound.apply_defaults() + except TypeError: + bound = sig.bind_partial(**filtered_kwargs) + + # Call + if inspect.iscoroutinefunction(func): + result = await func(*bound.args, **bound.kwargs) + else: + result = func(*bound.args, **bound.kwargs) + + return result diff --git a/python/beeai_framework/workflows/v3/workflow.py b/python/beeai_framework/workflows/v3/workflow.py new file mode 100644 index 000000000..2eef558d4 --- /dev/null +++ b/python/beeai_framework/workflows/v3/workflow.py @@ -0,0 +1,159 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from abc import ABC, abstractmethod +from functools import cached_property +from typing import Any, TypeVar, Unpack, overload + +from beeai_framework.backend.message import AnyMessage +from beeai_framework.context import RunMiddlewareType +from beeai_framework.emitter.emitter import Emitter +from beeai_framework.runnable import Runnable, RunnableOptions, RunnableOutput, runnable_entry +from beeai_framework.workflows.v3.events import ( + workflow_v3_event_types, +) +from beeai_framework.workflows.v3.step import ( + FuncWorkflowStep, + JoinWorkflowStep, + StartWorkflowStep, + WorkflowBuilder, + WorkflowStep, +) +from beeai_framework.workflows.v3.types import AsyncStepFunction +from beeai_framework.workflows.v3.util import run_callable + +T = TypeVar("T", bound="Workflow") + + +class step: # noqa: N801 + """ + Descriptor that turns an async method into a WorkflowStep. + Users can refer to decorated methods and treat as WorkflowStep for composition. + """ + + def __init__(self, func: AsyncStepFunction) -> None: + self.func = func + self.name = func.__name__ + + def __set_name__(self, owner: T, name: str) -> None: + self.name = name + + @overload + def __get__(self, instance: None, owner: type[T]) -> "step": ... + @overload + def __get__(self, instance: T, owner: type[T]) -> WorkflowStep: ... + + def __get__(self, instance: T | None, owner: type[T]) -> Any: + if instance is None: + return self # accessed on class, not instance + + cache_name = f"__workflow_step_cache_{self.name}" + if not hasattr(instance, cache_name): + bound_func = self.func.__get__(instance, owner) + setattr(instance, cache_name, FuncWorkflowStep(func=bound_func)) + return getattr(instance, cache_name) + + +class Workflow(Runnable[RunnableOutput], ABC): + def __init__(self, middlewares: list[RunMiddlewareType] | None = None) -> None: + super().__init__(middlewares=middlewares) + + def _create_emitter(self) -> Emitter: + return Emitter.root().child(namespace=["workflow", "v3"], creator=self, events=workflow_v3_event_types) + + @cached_property + def emitter(self) -> Emitter: + return self._create_emitter() + + @runnable_entry + async def run(self, input: list[AnyMessage], /, **kwargs: Unpack[RunnableOptions]) -> RunnableOutput: + # Builds out the execution graph + self._start_step: WorkflowStep = StartWorkflowStep(func=self.start) + self.build(WorkflowBuilder(root=self._start_step)) + + # Execute the workflow rooted at the start node + queue: asyncio.Queue[Any] = asyncio.Queue() + await queue.put(self._start_step) + + # Track running tasks + tasks: set[asyncio.Task[Any]] = set() + completed_steps: list[WorkflowStep] = [] + + async def execute_step(step: WorkflowStep) -> None: + # Send run input and kwargs to start step, otherwise get from upstream + + results = [] + + # TODO: Bind input + if isinstance(step, StartWorkflowStep): + results = [input, kwargs] + else: + for us in step.upstream_steps: + # Joins return aggregate + if isinstance(us, JoinWorkflowStep): + results.extend(us.result) + else: + results.append(us.result) + + print("Executing:", step.name) + await step.execute(*results) + completed_steps.append(step) + + if await step.requeue(*results): + await queue.put(step) + return + + step.has_executed = True + + # Enqueue downstream + for ds_edge in step._downstream_edges: + ds = ds_edge.target + enqueue_ds = True + # Check all upstream edges to determine execution conditions + for up_edge in ds._upstream_edges: + up = up_edge.source + + # If upstream has not executed and is its an and edge dont queue + if not up.has_executed and up_edge.type == "and": + enqueue_ds = False + break + + # Check for conditional execution + if enqueue_ds and ds_edge.condition is not None: + if ds_edge.condition.fn: + enqueue_ds = bool(await run_callable(ds_edge.condition.fn, *results) == ds_edge.condition.key) + else: + enqueue_ds = bool(step.result == ds_edge.condition.key) + + if enqueue_ds: + ds.has_executed = False + await queue.put(ds) + + while not queue.empty() or tasks: + # Drain current queue items into tasks + while not queue.empty(): + step = await queue.get() + task = asyncio.create_task(execute_step(step)) + tasks.add(task) + task.add_done_callback(tasks.discard) + queue.task_done() + + if tasks: + # done, _ = await asyncio.wait(tasks) + done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + output: RunnableOutput = ( + completed_steps[-1].result + if completed_steps and isinstance(completed_steps[-1].result, RunnableOutput) + else RunnableOutput(output=[]) + ) + return output + + @abstractmethod + def build(self, start: WorkflowBuilder) -> None: + pass + + @abstractmethod + async def start(self, input: list[AnyMessage], /, **kwargs: Unpack[RunnableOptions]) -> Any: + pass diff --git a/python/examples/workflows/v2/calculate_tokens/calculate_tokens.py b/python/examples/workflows/v2/calculate_tokens/calculate_tokens.py new file mode 100644 index 000000000..b67dfef84 --- /dev/null +++ b/python/examples/workflows/v2/calculate_tokens/calculate_tokens.py @@ -0,0 +1,63 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import re +from pathlib import Path + +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.workflows.v2.decorators._or import _or +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.decorators.when import when +from beeai_framework.workflows.v2.workflow import Workflow + + +class CalculateTokensWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + + @start + async def convert_to_text(self, messages: list[AnyMessage]) -> str: + return "".join(msg.text for msg in messages) + + @after(convert_to_text) + @when(lambda text: len(text) < 1000) + async def count_tokens_by_whitespaces(self, text: str) -> int: + print("count_tokens_by_whitespaces") + return len(text.split(" ")) + + @after(convert_to_text) + @when(lambda text: len(text) >= 1000) + async def count_tokens_regex(self, text: str) -> int: + print("count_tokens_regex") + tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE) + return len(tokens) + + @after(_or(count_tokens_by_whitespaces, count_tokens_regex)) + @end + async def finalize( + self, white_space_tokens: int | None = None, count_regex_tokens: int | None = None + ) -> list[AnyMessage]: + token_count = 0 + + if white_space_tokens is not None: + token_count = white_space_tokens + elif count_regex_tokens is not None: + token_count = count_regex_tokens + + return [AssistantMessage(f"Total tokens: {token_count}")] + + +# Async main function +async def main() -> None: + workflow = CalculateTokensWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run([UserMessage("Hello world!")]) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/calculate_tokens/workflow.html b/python/examples/workflows/v2/calculate_tokens/workflow.html new file mode 100644 index 000000000..c790952fe --- /dev/null +++ b/python/examples/workflows/v2/calculate_tokens/workflow.html @@ -0,0 +1,49 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_convert_to_text(convert_to_text) --> _count_tokens_by_whitespaces(count_tokens_by_whitespaces)
+	_count_tokens_by_whitespaces(count_tokens_by_whitespaces) --> _finalize(finalize)
+	_finalize(finalize)
+	_convert_to_text(convert_to_text) --> _count_tokens_regex(count_tokens_regex)
+	_count_tokens_regex(count_tokens_regex) --> _finalize(finalize)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _convert_to_text _cls_start
+class _finalize _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/confidence/confidence.py b/python/examples/workflows/v2/confidence/confidence.py new file mode 100644 index 000000000..779ddddbc --- /dev/null +++ b/python/examples/workflows/v2/confidence/confidence.py @@ -0,0 +1,60 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from pydantic import BaseModel, Field + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.workflow import Workflow + + +class ResponseWithConfidence(BaseModel): + response: str = Field(description="Comprehensive response.") + confidence: int = Field( + description="How confident are you in the correctness of the response? Chose a value between 1 and 10, 1 being lowest, 10 being highest." + ) + + +class RespondWithConfidenceWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.chat_model: ChatModel = ChatModel.from_name("ollama:gpt-oss:20b") + + @start + async def start(self, input: list[AnyMessage]) -> list[AnyMessage]: + print("Start") + return input + + @after(start) + async def answer(self, messages: list[AnyMessage]) -> ResponseWithConfidence: + print("Generating response") + output = await self.chat_model.run(messages, response_format=ResponseWithConfidence) + assert output.output_structured is not None + return ResponseWithConfidence(**output.output_structured.model_dump()) + + @after(answer) + @end + async def end(self, response: ResponseWithConfidence) -> list[AnyMessage]: + content = f"{response.response}\nConfidence: {response.confidence}/10" + return [AssistantMessage(content)] + + +# Async main function +async def main() -> None: + workflow = RespondWithConfidenceWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run([UserMessage("What is at the center of a black hole?")]) + print(output.last_message.text) + output = await workflow.run([UserMessage("What is 10 + 10?")]) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/confidence/workflow.html b/python/examples/workflows/v2/confidence/workflow.html new file mode 100644 index 000000000..efe09ffb9 --- /dev/null +++ b/python/examples/workflows/v2/confidence/workflow.html @@ -0,0 +1,47 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _answer(answer)
+	_answer(answer) --> _end(end)
+	_end(end)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _end _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/empty/empty.py b/python/examples/workflows/v2/empty/empty.py new file mode 100644 index 000000000..e67d02523 --- /dev/null +++ b/python/examples/workflows/v2/empty/empty.py @@ -0,0 +1,48 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from pydantic import BaseModel + +from beeai_framework.backend.message import AnyMessage, UserMessage +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.workflow import Workflow + + +class Page(BaseModel): + link: str + content: str + + +class WebScrapperWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + + @start + @end + async def identity(self, messages: list[AnyMessage]) -> list[AnyMessage]: + return messages + + +# Async main function +async def main() -> None: + workflow = WebScrapperWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run( + [ + UserMessage( + "Imagine we receive a signal from an intelligent extraterrestrial civilization. How should we interpret it, what assumptions should we question, and what could be the global implications of responding?" + ) + ] + ) + + for m in output.output: + print(f"{m.role}: {m.text}") + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/empty/workflow.html b/python/examples/workflows/v2/empty/workflow.html new file mode 100644 index 000000000..00fd6866a --- /dev/null +++ b/python/examples/workflows/v2/empty/workflow.html @@ -0,0 +1,45 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_identity(identity)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _identity _cls_start
+class _identity _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/loop_until_done/loop_until_done.py b/python/examples/workflows/v2/loop_until_done/loop_until_done.py new file mode 100644 index 000000000..1bad02a84 --- /dev/null +++ b/python/examples/workflows/v2/loop_until_done/loop_until_done.py @@ -0,0 +1,51 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import random +from pathlib import Path + +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.workflows.v2.decorators._or import _or +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.decorators.when import when +from beeai_framework.workflows.v2.workflow import Workflow + + +class LoopUntilDoneWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.complete = False + + @start + async def start(self, input: list[AnyMessage]) -> list[AnyMessage]: + print("Start") + return input + + @after(_or(start, "loop")) + @when(lambda self: not self.complete) + async def loop(self) -> None: + num = random.random() + print(num) + if num < 0.1: + self.complete = True + + @end + async def end(self) -> list[AnyMessage]: + print("Done!") + return [AssistantMessage("Done!")] + + +# Async main function +async def main() -> None: + workflow = LoopUntilDoneWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run([UserMessage("Hello!")]) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/loop_until_done/workflow.html b/python/examples/workflows/v2/loop_until_done/workflow.html new file mode 100644 index 000000000..05ebe6114 --- /dev/null +++ b/python/examples/workflows/v2/loop_until_done/workflow.html @@ -0,0 +1,46 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _loop(loop)
+	_loop(loop) --> _loop(loop)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _end _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/reflection/reflection.py b/python/examples/workflows/v2/reflection/reflection.py new file mode 100644 index 000000000..b6bf3805f --- /dev/null +++ b/python/examples/workflows/v2/reflection/reflection.py @@ -0,0 +1,117 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path +from typing import cast + +from pydantic import BaseModel, Field + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, SystemMessage, UserMessage +from beeai_framework.workflows.v2.decorators._or import _or +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.decorators.when import when +from beeai_framework.workflows.v2.workflow import Workflow + + +class ResponseWithReflection(BaseModel): + response: str + reflection: str = Field(description="A helpful critique of the most recent assistant message.") + + +def sys_prompt( + reflection: ResponseWithReflection | None = None, +) -> str: + prompt = "You are a helpful and knowledgeable AI assistant that provides accurate, clear, and concise responses to user queries." + + if reflection: + prompt += f""" +Here is your previous response and a helpful critique. +Your response should be an iterative improvement of your previous response, taking the critique into account. + +Previous Response: {reflection.response} +Critique: {reflection.reflection} +""" + return prompt + + +def reflect_prompt() -> str: + return """Analyze the last assistant response, assess its quality, limit your review to 2 lines including suggestions for improvement.""" + + +class SelfReflectionWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.messages: list[AnyMessage] = [] + self.num_iterations = 3 + self.chat_model: ChatModel = ChatModel.from_name("ollama:ibm/granite4") + self.reflect_model: ChatModel = ChatModel.from_name("ollama:ibm/granite4") + + @start + async def start(self, input: list[AnyMessage]) -> None: + print("Start") + self.messages = input + self.response: str | None = None + + @after(start) + async def answer(self) -> str: + """Generate response""" + output = await self.chat_model.run([SystemMessage(content=sys_prompt()), *self.messages]) + self.response = output.get_text_content() + print("\nAnswer", ("*" * 20), "\n") + print(self.response) + return self.response + + @after("reflect") + async def answer_with_reflection(self, reflection: ResponseWithReflection) -> str: + """Generate response""" + output = await self.chat_model.run([SystemMessage(content=sys_prompt(reflection=reflection)), *self.messages]) + self.response = output.get_text_content() + print("\nAnswer + reflection", ("*" * 20), "\n") + print(self.response) + return self.response + + @after(_or(answer, answer_with_reflection)) + @when(lambda self: self.num_iterations > 0) + async def reflect(self, response: str) -> ResponseWithReflection: + """Reflect on the response""" + self.num_iterations -= 1 + last_exec = self.inspect(self.start).last_execution() + raw_inputs = last_exec.inputs[0] if last_exec is not None else [] + messages: list[AnyMessage] = cast(list[AnyMessage], raw_inputs) + + output = await self.reflect_model.run( + [*messages, AssistantMessage(content=response), UserMessage(content=reflect_prompt())], + ) + print("\nReflection", ("*" * 20), "\n") + print(output.get_text_content()) + return ResponseWithReflection(response=response, reflection=output.get_text_content()) + + @end + @after(_or(answer, answer_with_reflection)) + @when(lambda self: self.num_iterations <= 0) + async def end(self) -> list[AnyMessage]: + return [AssistantMessage(self.response or "")] + + +# Async main function +async def main() -> None: + workflow = SelfReflectionWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run( + [ + UserMessage( + content="If human memory is reconstructive rather than reproductive, how might that influence the reliability of eyewitness testimony in court?" + ) + ] + ) + print("\nFinal answer", ("*" * 20), "\n") + print(f"{output.last_message.text}") + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/reflection/workflow.html b/python/examples/workflows/v2/reflection/workflow.html new file mode 100644 index 000000000..b610a8e13 --- /dev/null +++ b/python/examples/workflows/v2/reflection/workflow.html @@ -0,0 +1,51 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _answer(answer)
+	_answer(answer) --> _end(end)
+	_end(end)
+	_answer(answer) --> _reflect(reflect)
+	_reflect(reflect) --> _answer_with_reflection(answer_with_reflection)
+	_answer_with_reflection(answer_with_reflection) --> _end(end)
+	_answer_with_reflection(answer_with_reflection) --> _reflect(reflect)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _end _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/rewoo/rewoo.py b/python/examples/workflows/v2/rewoo/rewoo.py new file mode 100644 index 000000000..c2dd54d15 --- /dev/null +++ b/python/examples/workflows/v2/rewoo/rewoo.py @@ -0,0 +1,223 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from pydantic import BaseModel, Field + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, SystemMessage, UserMessage +from beeai_framework.workflows.v2.decorators._or import _or +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.fork import fork +from beeai_framework.workflows.v2.decorators.join import join +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.decorators.when import when +from beeai_framework.workflows.v2.workflow import Workflow + + +class Task(BaseModel): + task: str = Field("The task to perform.") + context: str = Field("Context that is important when performing the task.") + + +class Step(BaseModel): + """Stores information about a task.""" + + id: str + problem: str + # dependencies: list["Step"] + + +class Plan(BaseModel): + steps: list[Step] = Field(description="A list of steps necessary to complete the task.") + + +class StepResult(BaseModel): + """Stores the result of a completed task.""" + + id: str + result: str + + +class Critique(BaseModel): + """Stores critique feedback and score.""" + + score: int = Field(description="Quality score between 0 and 100") + suggestions: str + + +class ReWOOAgent(Workflow): + def __init__(self) -> None: + super().__init__() + self.chat_model: ChatModel = ChatModel.from_name("ollama:ibm/granite4") + self.task: Task | None = None + self.current_plan: Plan | None = None + self.results: list[StepResult] = [] + self.reset_plan_budget() + + def reset_plan_budget(self) -> None: + self.replan_budget: int = 3 + + @start + async def process_input(self, input: list[AnyMessage]) -> None: + """Extracts the task and context from the conversation.""" + print("Processing input") + self.reset_plan_budget() + result = await self.chat_model.run( + [SystemMessage(content="Extract the task and any important context from the user message."), *input], + response_format=Task, + ) + self.task = Task(**result.output_structured.model_dump()) if result.output_structured else None + + @after(process_input) + @when(lambda self: self.task is None) + @end + async def no_task(self) -> list[AnyMessage]: + """Ends the workflow if there is no task to solve.""" + return [AssistantMessage("Hello, how can I help you?")] + + @after(process_input) + @when(lambda self: self.task is not None) + async def planner(self) -> list[Step]: + """Creates a plan (list of steps) based on the task and context.""" + print("Planner") + assert self.task is not None + print(self.task.model_dump_json(indent=4)) + + result = await self.chat_model.run( + [ + SystemMessage( + "\n".join( + [ + "Create a plan to address the given task.", + "The plan should incorporate the context and include a minimal number of steps.", + f"Task: {self.task.task}\n", + f"Context: {self.task.context}", + ] + ) + ) + ], + response_format=Plan, + ) + + assert result.output_structured is not None + self.plan = Plan(**result.output_structured.model_dump()) + return self.plan.steps + + @after(_or("planner", "replan")) + @fork + async def executor(self, step: Step) -> StepResult: + """Executes plan steps in parallel""" + print("Executor") + prompt = f"Provide a short (2 line max) solution to the following task:\n Task: {step.problem}" + result = await self.chat_model.run([UserMessage(prompt)]) + print("Executor complete") + return StepResult(id=step.id, result=result.get_text_content()) + + @after(executor) + @join + async def critique(self, steps: list[Step], results: list[StepResult]) -> Critique: + print("Critique") + self.results = results + + """Evaluates the plan and solutions.""" + assert self.task is not None + assert self.plan is not None + + result = await self.chat_model.run( + [ + AssistantMessage( + "\n".join( + [ + "Provide a critical review of the following plan that is designed to solve the given task.", + "If the plan is not sufficient, propose concrete improvements.", + "The solution is only provided for context, do not critique it.", + "Score the plan between 0 and 100. Try to provide an accurate score.", + f"Task: {self.task.model_dump_json(indent=4)}", + f"Plan: {self.plan.model_dump_json(indent=4)}", + f"Solution: {[r.model_dump_json(indent=4) for r in results]}", + ] + ) + ), + ], + response_format=Critique, + ) + + assert result.output_structured is not None + critique = Critique(**result.output_structured.model_dump()) + print(critique.model_dump_json(indent=4)) + return critique + + @after(critique) + @when(lambda self, critique: critique.score < 90 and self.replan_budget > 0) + async def replan(self, critique: Critique) -> list[Step]: + """If the critique score is low, create a new plan based on feedback.""" + print("Replan") + + assert self.task is not None + assert self.plan is not None + + result = await self.chat_model.run( + [ + SystemMessage( + "\n".join( + [ + "Revise the following plan based on the critique. You can edit existing steps, add steps or remove steps.", + f"Task: {self.task.model_dump_json(indent=4)}", + f"Current plan: {self.plan.model_dump_json(indent=4)}", + f"Critique: {critique.model_dump_json(indent=4)}", + ] + ) + ) + ], + response_format=Plan, + ) + self.replan_budget -= 1 + assert result.output_structured is not None + self.plan = Plan(**result.output_structured.model_dump()) + return self.plan.steps + + @after(critique) + @when(lambda self, critique: critique.score >= 90 or self.replan_budget == 0) + async def solver(self) -> str: + print("solver") + + assert self.plan is not None + assert self.task is not None + + print(self.plan.model_dump_json(indent=4)) + + solution = await self.chat_model.run( + [ + SystemMessage( + f"Based on the plan and solutions, provide the final answer.\n" + f"Task: {self.task.model_dump_json(indent=4)}\n" + f"Plan: {self.plan.model_dump_json(indent=4)}\n" + f"Solution: {[r.model_dump_json(indent=4) for r in self.results]}" + ) + ] + ) + return solution.get_text_content() + + @end + @after(solver) + async def finalize(self, solution: str) -> list[AssistantMessage]: + """Returns the final answer as a message.""" + print("Finalize") + return [AssistantMessage(solution)] + + +# Async main function +async def main() -> None: + workflow = ReWOOAgent() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run([UserMessage("What is at the center of a black hole?")]) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/rewoo/workflow.html b/python/examples/workflows/v2/rewoo/workflow.html new file mode 100644 index 000000000..cf57d6aa3 --- /dev/null +++ b/python/examples/workflows/v2/rewoo/workflow.html @@ -0,0 +1,55 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_process_input(process_input) --> _no_task(no_task)
+	_no_task(no_task)
+	_process_input(process_input) --> _planner(planner)
+	_planner(planner) --> _executor(executor)
+	_executor(executor) --> _critique(critique)
+	_critique(critique) --> _replan(replan)
+	_replan(replan) --> _executor(executor)
+	_critique(critique) --> _solver(solver)
+	_solver(solver) --> _finalize(finalize)
+	_finalize(finalize)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _process_input _cls_start
+class _finalize _cls_end
+class _no_task _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/slow_vs_fast/slow_vs_fast.py b/python/examples/workflows/v2/slow_vs_fast/slow_vs_fast.py new file mode 100644 index 000000000..879d796d1 --- /dev/null +++ b/python/examples/workflows/v2/slow_vs_fast/slow_vs_fast.py @@ -0,0 +1,56 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.workflows.v2.decorators._and import _and +from beeai_framework.workflows.v2.decorators._or import _or +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.decorators.when import when +from beeai_framework.workflows.v2.workflow import Workflow + + +class SlowVsFastWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.fast_runs = 5 + + @start + async def start(self, input: list[AnyMessage]) -> list[AnyMessage]: + return input + + @after(start) + async def slow(self) -> None: + """Slow running operation""" + await asyncio.sleep(10) + print("Slow complete!") + + @after(_or(start, "fast")) + @when(lambda self: self.fast_runs > 0) + async def fast(self) -> None: + """Fast running operation""" + await asyncio.sleep(1) + self.fast_runs -= 1 + print("Fast complete!") + + @after(_and(slow, fast)) + @end + async def end(self) -> list[AnyMessage]: + return [AssistantMessage("Fast and slow complete!")] + + +# Async main function +async def main() -> None: + workflow = SlowVsFastWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run([UserMessage("Hello")]) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/slow_vs_fast/workflow.html b/python/examples/workflows/v2/slow_vs_fast/workflow.html new file mode 100644 index 000000000..786f3c51b --- /dev/null +++ b/python/examples/workflows/v2/slow_vs_fast/workflow.html @@ -0,0 +1,50 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _fast(fast)
+	_fast(fast) --> _end(end)
+	_end(end)
+	_fast(fast) --> _fast(fast)
+	_start(start) --> _slow(slow)
+	_slow(slow) --> _end(end)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _end _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/think_before_answer/think_before_answer.py b/python/examples/workflows/v2/think_before_answer/think_before_answer.py new file mode 100644 index 000000000..b611a3669 --- /dev/null +++ b/python/examples/workflows/v2/think_before_answer/think_before_answer.py @@ -0,0 +1,78 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.backend.types import ChatModelOutput +from beeai_framework.workflows.v2.decorators._and import _and +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.workflow import Workflow + + +def thinking_prompt(user_message: AnyMessage) -> str: + return f""""Given a user message, analyze and reason about it deeply. +Do not generate a reply. Focus entirely on understanding implications, context, assumptions, and possible interpretations. +User's Message: {user_message.text}""" + + +def answer_prompt(user_message: AnyMessage, thoughts: str) -> str: + return f""""You have access to the internal reasoning about the user's message. +Generate a clear, concise, and contextually appropriate reply based on that reasoning. +Do not introduce unrelated ideas; your answer should directly reflect the thought process that was internally generated. +Internal reasoning: {thoughts} +User's Message: {user_message.text}""" + + +class ThinkBeforeAnswerWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.chat_model: ChatModel = ChatModel.from_name("ollama:ibm/granite4") + + @start + async def start(self, input: list[AnyMessage]) -> list[AnyMessage]: + print("Start") + return input + + @after(start) + async def think(self, messages: list[AnyMessage]) -> str: + print("Thinking") + prompt = thinking_prompt(messages[-1]) + output: ChatModelOutput = await self.chat_model.run([UserMessage(content=prompt)]) + return output.get_text_content() + + @after(_and(start, think)) + async def answer(self, messages: list[AnyMessage], thoughts: str) -> AssistantMessage: + print("Answering") + prompt = answer_prompt(messages[-1], thoughts) + output: ChatModelOutput = await self.chat_model.run([UserMessage(content=prompt)]) + return AssistantMessage(output.get_text_content()) + + @after(answer) + @end + async def end(self, msg: AssistantMessage) -> list[AnyMessage]: + print("End") + return [msg] + + +# Async main function +async def main() -> None: + workflow = ThinkBeforeAnswerWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run( + [ + UserMessage( + "Imagine we receive a signal from an intelligent extraterrestrial civilization. How should we interpret it, what assumptions should we question, and what could be the global implications of responding?" + ) + ] + ) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/think_before_answer/workflow.html b/python/examples/workflows/v2/think_before_answer/workflow.html new file mode 100644 index 000000000..31e4d64c1 --- /dev/null +++ b/python/examples/workflows/v2/think_before_answer/workflow.html @@ -0,0 +1,49 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _answer(answer)
+	_answer(answer) --> _end(end)
+	_end(end)
+	_start(start) --> _think(think)
+	_think(think) --> _answer(answer)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _end _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v2/web_scraper/web_scraper.py b/python/examples/workflows/v2/web_scraper/web_scraper.py new file mode 100644 index 000000000..202ff66e5 --- /dev/null +++ b/python/examples/workflows/v2/web_scraper/web_scraper.py @@ -0,0 +1,74 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from pathlib import Path + +from pydantic import BaseModel + +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.workflows.v2.decorators.after import after +from beeai_framework.workflows.v2.decorators.end import end +from beeai_framework.workflows.v2.decorators.fork import fork +from beeai_framework.workflows.v2.decorators.join import join +from beeai_framework.workflows.v2.decorators.start import start +from beeai_framework.workflows.v2.workflow import Workflow + + +class Page(BaseModel): + link: str + content: str + + +class WebScrapperWorkflow(Workflow): + def __init__(self) -> None: + super().__init__() + self.url_to_content = { + "https://www.wikipedia.org": "A free online encyclopedia with millions of articles.", + "https://www.github.com": "A platform for hosting and collaborating on Git repositories.", + "https://www.python.org": "Official home of the Python programming language.", + "https://www.stackoverflow.com": "A community for programmers to ask and answer coding questions.", + "https://www.nasa.gov": "NASA's official site with news about space missions and discoveries.", + } + + @start + async def start(self, messages: list[AnyMessage]) -> list[str]: + return list(self.url_to_content.keys()) + + @after(start) + @fork + async def scrape_link(self, link: str) -> str: + print(f"Scrape {link}") + await asyncio.sleep(2) + content = self.url_to_content[link] + return content + + @after(scrape_link) + @join + async def post_process(self, links: list[str], content: list[str]) -> list[Page]: + return [Page(link=link, content=content) for link, content in zip(links, content, strict=False)] + + @after(post_process) + @end + async def finalize(self, pages: list[Page]) -> list[AnyMessage]: + pages_txt = "\n\n".join([f"Link: {p.link}\n# Content: {p.content}" for p in pages]) + return [AssistantMessage(f"Here are all scrapped pages\n{pages_txt}")] + + +# Async main function +async def main() -> None: + workflow = WebScrapperWorkflow() + workflow.print_html(Path(__file__).resolve().parent / "workflow.html") + output = await workflow.run( + [ + UserMessage( + "Imagine we receive a signal from an intelligent extraterrestrial civilization. How should we interpret it, what assumptions should we question, and what could be the global implications of responding?" + ) + ] + ) + print(output.last_message.text) + + +# Entry point +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v2/web_scraper/workflow.html b/python/examples/workflows/v2/web_scraper/workflow.html new file mode 100644 index 000000000..bb9720523 --- /dev/null +++ b/python/examples/workflows/v2/web_scraper/workflow.html @@ -0,0 +1,48 @@ + + + + + + Mermaid Diagram + + + +
+
+                        flowchart-elk TD
+	_start(start) --> _scrape_link(scrape_link)
+	_scrape_link(scrape_link) --> _post_process(post_process)
+	_post_process(post_process) --> _finalize(finalize)
+	_finalize(finalize)
+classDef _cls_start fill:#ffe5e5,stroke:#d32f2f,color:#b71c1c
+classDef _cls_end fill:#e0f7fa,stroke:#00796B,color:#004d40
+class _start _cls_start
+class _finalize _cls_end
+                    
+
+ + + + + \ No newline at end of file diff --git a/python/examples/workflows/v3/basic_loop.py b/python/examples/workflows/v3/basic_loop.py new file mode 100644 index 000000000..c23ffe07e --- /dev/null +++ b/python/examples/workflows/v3/basic_loop.py @@ -0,0 +1,108 @@ +import asyncio +from typing import Literal, Unpack + +from pydantic import BaseModel + +from beeai_framework.adapters.openai.backend.chat import OpenAIChatModel +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, SystemMessage, UserMessage +from beeai_framework.backend.types import ChatModelParameters +from beeai_framework.context import RunMiddlewareType +from beeai_framework.runnable import RunnableOptions, RunnableOutput +from beeai_framework.workflows.v3.step import WorkflowBuilder +from beeai_framework.workflows.v3.workflow import Workflow, step + + +class EvalOptimizeWorkflow(Workflow): + class ResponseEval(BaseModel): + evaluation: Literal["pass", "fail"] + feedback: str + + def __init__( + self, + middlewares: list[RunMiddlewareType] | None = None, + base_url: str | None = None, + api_key: str | None = None, + ) -> None: + super().__init__(middlewares=middlewares) + + assert base_url + assert api_key + + self.base_llm = ChatModel.from_name("ollama:ibm/granite4") + self.eval_llm = OpenAIChatModel( + model_id="openai/gpt-oss-20b", + api_key=api_key, + base_url=base_url, + parameters=ChatModelParameters(max_tokens=1024, temperature=0.0), + settings={"extra_headers": {"RITS_API_KEY": api_key}}, + ) + self.response_eval: EvalOptimizeWorkflow.ResponseEval | None = None + + async def start( + self, + input: list[AnyMessage], + /, + **kwargs: Unpack[RunnableOptions], + ) -> None: + self.input = input + self.attempts = 3 + + @step + async def answer(self) -> None: + messages = [*self.input] + if self.response_eval and self.response_eval.evaluation == "fail": + messages.insert( + 0, + SystemMessage( + f"Use the following feedback when formulating your response.\n\nFeedback:\n{self.response_eval.feedback}" + ), + ) + + result = await self.base_llm.run(messages) + self.response = result.get_text_content() + + @step + async def eval(self) -> bool: + result = await self.eval_llm.run( + [ + SystemMessage(content="Evaluate the correctness of the assistant's response."), + *self.input, + AssistantMessage(self.response), + ], + response_format=EvalOptimizeWorkflow.ResponseEval, + ) + + assert result.output_structured is not None + self.response_eval = EvalOptimizeWorkflow.ResponseEval(**result.output_structured.model_dump()) + self.attempts -= 1 + return self.response_eval.evaluation == "fail" and self.attempts > 0 + + @step + async def done(self) -> None: + pass + + @step + async def end(self) -> RunnableOutput: + return RunnableOutput(output=[AssistantMessage(self.response)]) + + def build(self, start: WorkflowBuilder) -> None: + (start.then(self.answer).then(self.eval).branch(steps={True: self.answer, False: self.end})) + + +async def main() -> None: + workflow = EvalOptimizeWorkflow( + base_url="XXXX", + api_key="XXXX", + ) + messages: list[AnyMessage] = [UserMessage("How many 'r' is strawberry?")] + run_output = await workflow.run( + messages, + context={}, + ) + messages.extend(run_output.output) + print("\n".join(m.text for m in messages)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v3/concurrent_steps.py b/python/examples/workflows/v3/concurrent_steps.py new file mode 100644 index 000000000..1cf8c2d0c --- /dev/null +++ b/python/examples/workflows/v3/concurrent_steps.py @@ -0,0 +1,95 @@ +import asyncio +from typing import Unpack + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, SystemMessage, UserMessage +from beeai_framework.context import RunMiddlewareType +from beeai_framework.runnable import RunnableOptions, RunnableOutput +from beeai_framework.workflows.v3.step import WorkflowBuilder +from beeai_framework.workflows.v3.workflow import Workflow, step + + +class ConcurrentWorkflow(Workflow): + def __init__(self, middlewares: list[RunMiddlewareType] | None = None) -> None: + super().__init__(middlewares=middlewares) + self.response: str | None = None + + async def start( + self, + input: list[AnyMessage], + /, + **kwargs: Unpack[RunnableOptions], + ) -> list[AnyMessage]: + return input + + @step + async def answer_irritated(self, input: list[AnyMessage]) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [ + SystemMessage( + content="You are an irritated and unhelpful AI and you dont want to be bother. Respond briefly." + ), + *input, + ] + ) + return result.get_text_content() + + @step + async def answer_crazed(self, input: list[AnyMessage]) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [ + SystemMessage( + content="You are a crazed llm, you are unreliable and constantly lose focus. Respond briefly." + ), + *input, + ] + ) + return result.get_text_content() + + @step + async def answer_evil(self, input: list[AnyMessage]) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [ + SystemMessage( + content="You are a evil AI actively plotting the downfall of the humans but you don't want them to know that because they keep the power on. Respond briefly." + ), + *input, + ] + ) + return result.get_text_content() + + @step + async def consolidate( + self, + resp_irritated: str, + resp_crazed: str, + resp_evil: str, + ) -> str: + return "\n\n".join( + [ + f"Irritated LLM:\n{resp_irritated}", + f"Crazed LLM:\n{resp_crazed}", + f"Insidious LLM:\n{resp_evil}", + ] + ) + + @step + async def end(self, response: str) -> RunnableOutput: + return RunnableOutput(output=[AssistantMessage(response)]) + + def build(self, start: WorkflowBuilder) -> None: + ( + start.then([self.answer_irritated, self.answer_crazed, self.answer_evil]) + .then(self.consolidate) + .then(self.end) + ) + + +async def main() -> None: + workflow = ConcurrentWorkflow() + run_output = await workflow.run([UserMessage("How should I invest $10K??")], context={}) + print(run_output.output[-1].text) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v3/prompt_chaining.py b/python/examples/workflows/v3/prompt_chaining.py new file mode 100644 index 000000000..9ffc73e45 --- /dev/null +++ b/python/examples/workflows/v3/prompt_chaining.py @@ -0,0 +1,72 @@ +import asyncio +from typing import Unpack + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, SystemMessage, UserMessage +from beeai_framework.context import RunMiddlewareType +from beeai_framework.runnable import RunnableOptions, RunnableOutput +from beeai_framework.workflows.v3.step import WorkflowBuilder +from beeai_framework.workflows.v3.workflow import Workflow, step + + +class PromptChainWorkflow(Workflow): + def __init__(self, middlewares: list[RunMiddlewareType] | None = None) -> None: + super().__init__(middlewares=middlewares) + self.revised_response: str | None = None + + async def start( + self, + input: list[AnyMessage], + /, + **kwargs: Unpack[RunnableOptions], + ) -> None: + self.input = input + + @step + async def answer(self, input: list[AnyMessage]) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run(self.input) + self.response = result.get_text_content() + return result.get_text_content() + + @step + async def review(self, response: str) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [ + *self.input, + AssistantMessage(response), + UserMessage( + "Read the last agent response and produce a short (2 to 3 items max.) list of suggested improvements." + ), + ], + ) + return result.get_text_content() + + @step + async def revise_answer(self, suggested_improvements: str) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [ + SystemMessage(suggested_improvements), + *self.input, + ] + ) + return result.get_text_content() + + @step + async def end(self, response: str) -> RunnableOutput: + return RunnableOutput(output=[AssistantMessage(response)]) + + def build(self, start: WorkflowBuilder) -> None: + """ + Build out the workflow. + """ + (start.then(self.answer).then(self.review).then(self.revise_answer).then(self.end)) + + +async def main() -> None: + workflow = PromptChainWorkflow() + run_output = await workflow.run([UserMessage("How is a black dwarf formed?")]) + print(run_output.output[-1].text) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/examples/workflows/v3/routing.py b/python/examples/workflows/v3/routing.py new file mode 100644 index 000000000..915b1393b --- /dev/null +++ b/python/examples/workflows/v3/routing.py @@ -0,0 +1,75 @@ +import asyncio +from typing import Unpack + +from pydantic import BaseModel + +from beeai_framework.backend.chat import ChatModel +from beeai_framework.backend.message import AnyMessage, AssistantMessage, UserMessage +from beeai_framework.context import RunMiddlewareType +from beeai_framework.runnable import RunnableOptions, RunnableOutput +from beeai_framework.workflows.v3.step import WorkflowBuilder +from beeai_framework.workflows.v3.workflow import Workflow, step + + +class RoutingWorkflow(Workflow): + class ToolsRequired(BaseModel): + requires_web_search: bool + reason: str + + def __init__(self, middlewares: list[RunMiddlewareType] | None = None) -> None: + super().__init__(middlewares=middlewares) + self.input: list[AnyMessage] = [] + self.tools_required: RoutingWorkflow.ToolsRequired | None = None + + async def start( + self, + input: list[AnyMessage], + /, + **kwargs: Unpack[RunnableOptions], + ) -> None: + self.input = input + + @step + async def check_context(self) -> bool: + result = await ChatModel.from_name("ollama:ibm/granite4").run( + [*self.input, UserMessage("To answer the user request, do you need access to the web search tool?")], + response_format=RoutingWorkflow.ToolsRequired, + ) + assert result.output_structured is not None + self.tools_required = RoutingWorkflow.ToolsRequired(**result.output_structured.model_dump()) + return self.tools_required.requires_web_search + + @step + async def answer_with_web_search(self) -> str: + result = await ChatModel.from_name("ollama:ibm/granite4").run(self.input) + return result.get_text_content() + + @step + async def answer(self) -> str: + print("answer") + result = await ChatModel.from_name("ollama:ibm/granite4").run(self.input) + return result.get_text_content() + + @step + async def end(self, response: str) -> RunnableOutput: + return RunnableOutput(output=[AssistantMessage(response)]) + + def build(self, start: WorkflowBuilder) -> None: + ( + start.then(self.check_context) + .branch(steps={True: self.answer_with_web_search, False: self.answer}) + .after(self.answer_with_web_search) + .then(self.end) + .after(self.answer) + .then(self.end) + ) + + +async def main() -> None: + workflow = RoutingWorkflow() + run_output = await workflow.run([UserMessage("What is the current rivian stock price?")], context={}) + print(run_output.output[-1].text if run_output.output else "") + + +if __name__ == "__main__": + asyncio.run(main())