Task Orchestration and Execution Framework
The core of apflow is task orchestration and execution specifications. It provides a unified task orchestration framework that supports execution of multiple task types. The core is pure orchestration with no LLM dependencies - CrewAI support is optional.
Core includes:
- Task orchestration specifications (TaskManager)
- Core interfaces (ExecutableTask, BaseTask, TaskStorage)
- Storage (DuckDB default, PostgreSQL optional)
- NO CrewAI dependency (available via [crewai] extra)
Optional features:
- CrewAI Support [crewai]: LLM-based agent crews via CrewaiExecutor (task executor implementation)
- HTTP/REST Executor [http]: Remote API calls via RestExecutor (task executor implementation)
- SSH Executor [ssh]: Remote command execution via SSH (task executor implementation)
- Docker Executor [docker]: Containerized command execution (task executor implementation)
- gRPC Executor [grpc]: gRPC service calls (task executor implementation)
- WebSocket Executor: Bidirectional WebSocket communication (task executor implementation)
- apflow API Executor: Inter-instance API calls for distributed execution (task executor implementation)
- MCP Executor: Model Context Protocol executor for accessing external tools and data sources (task executor implementation)
- MCP Server [a2a]: MCP (Model Context Protocol) server exposing task orchestration as MCP tools and resources
- LLM Executor [llm]: Direct LLM interaction via LiteLLM (supports OpenAI, Anthropic, Gemini, etc.)
- A2A Protocol Server [a2a]: A2A Protocol Server (A2A Protocol is the standard protocol for agent communication)
- CLI Tools [cli]: Command-line interface
Note: CrewaiExecutor and future executors are all implementations of the ExecutableTask interface. Each executor handles different types of task execution (LLM, HTTP, etc.).
- TaskManager: Task tree orchestration, dependency management, priority scheduling
- Unified Execution Specification: All task types unified through the
ExecutableTaskinterface
All task executors implement the ExecutableTask interface:
- Custom Tasks (core): Users implement
ExecutableTaskfor their own task types - CrewaiExecutor [crewai]: LLM-based task execution via CrewAI (built-in executor)
- RestExecutor [http]: HTTP/REST API calls with authentication and retry (built-in executor)
- SshExecutor [ssh]: Remote command execution via SSH (built-in executor)
- DockerExecutor [docker]: Containerized command execution (built-in executor)
- GrpcExecutor [grpc]: gRPC service calls (built-in executor)
- WebSocketExecutor: Bidirectional WebSocket communication (built-in executor)
- ApFlowApiExecutor: Inter-instance API calls for distributed execution (built-in executor)
- McpExecutor: Model Context Protocol executor for accessing external tools and data sources (built-in executor)
- GenerateExecutor: Generate task tree JSON arrays from natural language requirements using LLM (built-in executor)
- LLMExecutor [llm]: Direct LLM interaction via LiteLLM (supports 100+ providers)
- BatchCrewaiExecutor [crewai]: Batch orchestration container (batches multiple crews)
- Storage: Task state persistence (DuckDB default, PostgreSQL optional)
- Unified External API: A2A Protocol Server (HTTP, SSE, WebSocket) [a2a]
- Real-time Progress Streaming: Streaming support via A2A Protocol
- CLI Tools: Command-line interface [cli]
- A2A Protocol: The framework adopts A2A (Agent-to-Agent) Protocol as the standard protocol for agent communication. A2A Protocol is a mature, production-ready specification designed specifically for AI Agent systems, providing:
- Agent-to-agent standardized communication interface
- Streaming task execution support
- Agent capability description mechanism (AgentCard, AgentSkill)
- Multiple transport methods (HTTP, SSE, WebSocket)
- Task management and status tracking
- JWT authentication support
pip install apflowIncludes: Task orchestration specifications, core interfaces, storage (DuckDB) Excludes: CrewAI, batch execution, API server, CLI tools
# CrewAI LLM task support (includes batch)
pip install apflow[crewai]
# Includes: CrewaiExecutor for LLM-based agent crews
# BatchCrewaiExecutor for atomic batch execution of multiple crews
# A2A Protocol Server (Agent-to-Agent communication protocol)
pip install apflow[a2a]
# Run A2A server: python -m apflow.api.main
# Or: apflow-server (CLI command)
# CLI tools
pip install apflow[cli]
# Run CLI: apflow or apflow
# PostgreSQL storage
pip install apflow[postgres]
# SSH executor (remote command execution)
pip install apflow[ssh]
# Docker executor (containerized execution)
pip install apflow[docker]
# gRPC executor (gRPC service calls)
pip install apflow[grpc]
# LLM support (LiteLLM, supports 100+ providers)
pip install apflow[llm]
# Everything (includes all extras)
pip install apflow[all]Get started with apflow in minutes!
# Minimal installation (core only)
pip install apflow
# With all features
pip install apflow[all]Using Task Orchestration Specifications:
from apflow import TaskManager, TaskTreeNode, create_session
# Create database session and task manager (core)
db = create_session() # or: db = get_default_session()
task_manager = TaskManager(db)
# Create task tree (task orchestration)
# Use task_repository to create tasks
root_task = await task_manager.task_repository.create_task(
name="root_task",
user_id="user_123",
priority=2
)
child_task = await task_manager.task_repository.create_task(
name="custom_task", # Task name corresponds to specific executor
user_id="user_123",
parent_id=root_task.id,
dependencies=[], # Dependency relationships
inputs={"url": "https://example.com"}
)
# Build task tree and execute (task orchestration core)
task_tree = TaskTreeNode(root_task)
task_tree.add_child(TaskTreeNode(child_task))
result = await task_manager.distribute_task_tree(task_tree)Creating Custom Tasks (Traditional External Service Calls):
from apflow import ExecutableTask
from typing import Dict, Any
import aiohttp
class APICallTask(ExecutableTask):
"""Traditional external API call task"""
id = "api_call_task"
name = "API Call Task"
description = "Call external API service"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.post(inputs["url"], json=inputs.get("data")) as response:
result = await response.json()
return {"status": "completed", "result": result}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {"type": "string", "description": "API endpoint"},
"data": {"type": "object", "description": "Request data"}
}
}Executing CrewAI (LLM) Tasks:
# Requires: pip install apflow[crewai]
from apflow.extensions.crewai import CrewaiExecutor
# CrewAI task execution
crew = CrewaiExecutor(
name="Analysis Crew",
agents=[{"role": "Analyst", "goal": "Analyze data"}],
tasks=[{"description": "Analyze input", "agent": "Analyst"}]
)
result = await crew.execute(inputs={...})Using BatchCrewaiExecutor to batch multiple crews (atomic operation):
# Requires: pip install apflow[crewai]
from apflow.extensions.crewai import BatchCrewaiExecutor, CrewaiExecutor
# BatchCrewaiExecutor is a batch container - executes multiple crews as atomic operation
batch = BatchCrewaiExecutor(
id="my_batch",
name="Batch Analysis",
works={
"data_collection": {
"agents": [{"role": "Collector", "goal": "Collect data"}],
"tasks": [{"description": "Collect data", "agent": "Collector"}]
},
"data_analysis": {
"agents": [{"role": "Analyst", "goal": "Analyze data"}],
"tasks": [{"description": "Analyze data", "agent": "Analyst"}]
}
}
)
# All crews execute sequentially, results are merged
# If any crew fails, entire batch fails (atomic)
result = await batch.execute(inputs={...})# Run tasks (standard mode - recommended)
apflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'
# Or use the shorthand
apflow run flow --tasks '[{"id": "task1", "name": "Task 1", "schemas": {"method": "executor_id"}, "inputs": {"key": "value"}}]'
# Or legacy mode (executor ID + inputs)
apflow run flow executor_id --inputs '{"key": "value"}'
# Start API server
apflow serve --port 8000
# Start daemon mode
apflow daemon start
# Stop daemon mode
apflow daemon stopThe [a2a] extra provides an A2A (Agent-to-Agent) Protocol server built on Starlette/FastAPI.
from apflow.api import create_app
# Create A2A protocol server app
app = create_app()
# Run with: uvicorn app:app --port 8000
# Or use the entry point: apflow-serverNote: The current [a2a] extra focuses on A2A protocol support. Future versions may
include additional FastAPI REST API endpoints for direct HTTP access without the A2A protocol.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Unified External API Interface Layer β
β - A2A Protocol Server (HTTP/SSE/WebSocket) [a2a] β
β - CLI Tools [cli] β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Task Orchestration Specification Layer (CORE) β
β - TaskManager: Task tree orchestration, dependency β
β management, priority scheduling β
β - ExecutableTask: Unified task interface β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Task Execution Layer β
β - Custom Tasks [core]: ExecutableTask implementations β
β β’ Traditional external service calls (API, DB, etc.) β
β β’ Automated task services (scheduled tasks, workflows) β
β - CrewaiExecutor [crewai]: CrewAI (LLM) task execution β
β - BatchCrewaiExecutor [crewai]: Batch task orchestration β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Supporting Features Layer β
β - Storage: Task state persistence (DuckDB/PostgreSQL) β
β - Streaming: Real-time progress updates β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
See docs/architecture/DIRECTORY_STRUCTURE.md for detailed directory structure and module descriptions.
Installation Strategy:
pip install apflow: Core library only (execution, base, storage, utils) - NO CrewAIpip install apflow[crewai]: Core + CrewAI support (includes BatchCrewaiExecutor)pip install apflow[a2a]: Core + A2A Protocol Serverpip install apflow[cli]: Core + CLI toolspip install apflow[all]: Full installation (all features)
Note: For examples and learning templates, see the test cases in tests/integration/ and tests/extensions/.
Quick Links:
- π Full Documentation - Complete documentation index
- π Quick Start - Get running in 10 minutes
- π Tutorials - Step-by-step tutorials
- π Guides - Comprehensive guides
- π‘ Examples - Practical examples
- π§ API Reference - Complete API documentation
For New Users:
- Start with Getting Started
- Follow the Quick Start Guide
- Try the First Steps Tutorial
For Developers:
For Contributors:
Full documentation is also available at flow-docs.aipartnerup.com.
Contributions are welcome! Please see our Contributing Guide for setup instructions and contribution guidelines.
Apache-2.0
- Documentation: docs/index.md - Complete documentation
- Website: aipartnerup.com
- GitHub: aipartnerup/apflow
- PyPI: apflow
- Issues: GitHub Issues
- Discussions: GitHub Discussions