Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

# A2A and Remote
/autogen/a2a/ @Lancetnik
/autogen/remote/ @Lancetnik
/test/a2a/ @Lancetnik
/test/remote/ @Lancetnik
/website/docs/user-guide/a2a/ @Lancetnik
Expand Down
3 changes: 1 addition & 2 deletions autogen/a2a/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
stacklevel=2,
)

from autogen.remote.httpx_client_factory import HttpxClientFactory

from .agent_executor import AutogenAgentExecutor
from .client import A2aRemoteAgent
from .httpx_client_factory import MockClient
from .remote import HttpxClientFactory
from .server import A2aAgentServer, CardSettings

__all__ = (
Expand Down
48 changes: 31 additions & 17 deletions autogen/a2a/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

from autogen import ConversableAgent
from autogen.doc_utils import export_module
from autogen.remote.agent_service import AgentService

from .utils import request_message_from_a2a, response_message_to_a2a
from .remote import AgentService, ServiceResponse
from .utils import make_artifact, make_input_required_message, make_working_message, request_message_from_a2a


@export_module("autogen.a2a")
Expand Down Expand Up @@ -50,28 +50,42 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non

updater = TaskUpdater(event_queue, task.id, task.context_id)

final_message: ServiceResponse | None = None
try:
result = await self.agent(request_message_from_a2a(context.message))
async for response in self.agent(request_message_from_a2a(context.message)):
if response.input_required:
await updater.requires_input(
message=make_input_required_message(
context_id=task.context_id,
task_id=task.id,
text=response.input_required,
context=response.context,
),
final=True,
)
return

else:
await updater.update_status(
message=make_working_message(
message=response.message,
context_id=task.context_id,
task_id=task.id,
),
state=TaskState.working,
)

final_message = response

except Exception as e:
raise ServerError(error=InternalError()) from e

artifact, messages, input_required_msg = response_message_to_a2a(result, context.context_id, task.id)

# publish local chat history events
for message in messages:
await updater.update_status(
state=TaskState.working,
message=message,
if final_message:
artifact = make_artifact(
message=final_message.message,
context=final_message.context,
)

# publish input required event
if input_required_msg:
await updater.requires_input(message=input_required_msg, final=True)
return

# publish the task final result event
if artifact:
await updater.add_artifact(
artifact_id=artifact.artifact_id,
name=artifact.name,
Expand Down
3 changes: 1 addition & 2 deletions autogen/a2a/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
from autogen.events.agent_events import TerminationEvent
from autogen.io.base import IOStream
from autogen.oai.client import OpenAIWrapper
from autogen.remote.httpx_client_factory import ClientFactory, EmptyClientFactory
from autogen.remote.protocol import RequestMessage, ResponseMessage

from .errors import A2aAgentNotFoundError, A2aClientError
from .remote import ClientFactory, EmptyClientFactory, RequestMessage, ResponseMessage
from .utils import (
request_message_to_a2a,
response_message_from_a2a_message,
Expand Down
2 changes: 1 addition & 1 deletion autogen/a2a/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: Apache-2.0


from autogen.remote.errors import RemoteAgentError, RemoteAgentNotFoundError
from .remote import RemoteAgentError, RemoteAgentNotFoundError


class A2aClientError(RemoteAgentError):
Expand Down
3 changes: 2 additions & 1 deletion autogen/a2a/httpx_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from httpx import MockTransport, Request, Response

from autogen.doc_utils import export_module
from autogen.remote.httpx_client_factory import HttpxClientFactory

from .remote import HttpxClientFactory


@export_module("autogen.a2a")
Expand Down
25 changes: 25 additions & 0 deletions autogen/a2a/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0

try:
import httpx # noqa: F401
except ImportError as e:
raise ImportError("httpx is not installed. Please install it with:\npip install httpx") from e

from .agent_service import AgentService
from .errors import RemoteAgentError, RemoteAgentNotFoundError
from .httpx_client_factory import ClientFactory, EmptyClientFactory, HttpxClientFactory
from .protocol import RequestMessage, ResponseMessage, ServiceResponse

__all__ = (
"AgentService",
"ClientFactory",
"EmptyClientFactory",
"HttpxClientFactory",
"RemoteAgentError",
"RemoteAgentNotFoundError",
"RequestMessage",
"ResponseMessage",
"ServiceResponse",
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0

import warnings
from collections.abc import AsyncGenerator
from typing import Any, Literal, cast

from autogen.agentchat import ConversableAgent
Expand All @@ -15,20 +16,21 @@
from autogen.events.base_event import BaseEvent
from autogen.io.base import AsyncIOStreamProtocol

from .protocol import RemoteService, RequestMessage, ResponseMessage, get_tool_names
from .protocol import RequestMessage, ServiceResponse, get_tool_names


class AgentService(RemoteService):
class AgentService:
def __init__(self, agent: ConversableAgent) -> None:
self.name = agent.name
self.agent = agent

async def __call__(self, state: RequestMessage) -> ResponseMessage | None:
async def __call__(self, state: RequestMessage) -> AsyncGenerator[ServiceResponse, None]:
out_message: dict[str, Any] | None
if guardrail_result := self.agent.run_input_guardrails(state.messages):
# input guardrail activated by initial messages
_, out_message = normilize_message_to_oai(guardrail_result.reply, self.agent.name, role="assistant")
return ResponseMessage(messages=[out_message], context=state.context)
yield ServiceResponse(message=out_message, context=state.context)
return

context_variables = ContextVariables(state.context)
tool_executor = self._make_tool_executor(context_variables)
Expand All @@ -38,24 +40,19 @@ async def __call__(self, state: RequestMessage) -> ResponseMessage | None:
messages = state.messages + local_history

stream = HITLStream()
# check HITL required
await self.agent.a_check_termination_and_human_reply(messages, iostream=stream)
if stream.is_input_required:
return ResponseMessage(
messages=local_history,
context=context_variables.data or None,
yield ServiceResponse(
input_required=stream.input_prompt,
context=context_variables.data or None,
)
return

reply = await self.agent.a_generate_reply(
messages,
exclude=(
ConversableAgent.check_termination_and_human_reply,
ConversableAgent.a_check_termination_and_human_reply,
ConversableAgent.generate_oai_reply,
ConversableAgent.a_generate_oai_reply,
),
)
# check code execution
_, reply = self.agent.generate_code_execution_reply(messages)

# generate LLM reply
if not reply:
_, reply = await self.agent.a_generate_oai_reply(
messages,
Expand All @@ -65,14 +62,20 @@ async def __call__(self, state: RequestMessage) -> ResponseMessage | None:
should_continue, out_message = self._add_message_to_local_history(reply, role="assistant")
if out_message:
local_history.append(out_message)
yield ServiceResponse(
message=out_message,
context=context_variables.data or None,
)
if not should_continue:
break
out_message = cast(dict[str, Any], out_message)

called_tools = get_tool_names(out_message.get("tool_calls", []))
if state.client_tool_names.intersection(called_tools):
break # return client tool execution command back to client
# if AI called a client tool, return the tool execution command back to client
break

# execute local tools
tool_result, updated_context_variables, return_to_user = self._try_execute_local_tool(
tool_executor, out_message
)
Expand All @@ -83,25 +86,21 @@ async def __call__(self, state: RequestMessage) -> ResponseMessage | None:
should_continue, out_message = self._add_message_to_local_history(tool_result, role="tool")
if out_message:
local_history.append(out_message)
yield ServiceResponse(
message=out_message,
context=context_variables.data or None,
)

if return_to_user:
return ResponseMessage(
messages=local_history,
context=context_variables.data or None,
yield ServiceResponse(
input_required="Please, provide additional information:\n",
context=context_variables.data or None,
)
return

if not should_continue:
break

if not local_history:
return None

return ResponseMessage(
messages=local_history,
context=context_variables.data or None,
)

def _add_message_to_local_history(
self, message: str | dict[str, Any] | None, role: str
) -> tuple[Literal[True], dict[str, Any]] | tuple[Literal[False], dict[str, Any] | None]:
Expand Down
File renamed without changes.
14 changes: 5 additions & 9 deletions autogen/remote/protocol.py → autogen/a2a/remote/protocol.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Protocol
from typing import Any

from pydantic import BaseModel, Field

Expand All @@ -23,14 +23,10 @@ class ResponseMessage(AgentBusMessage):
input_required: str | None = None


class RemoteService(Protocol):
"""Interface to make AgentBus compatible with non AG2 systems."""

name: str

async def __call__(self, state: RequestMessage) -> ResponseMessage | None:
"""Executable that consumes Conversation State and returns a new state."""
...
class ServiceResponse(BaseModel):
message: dict[str, Any] | None = None
context: dict[str, Any] | None = None
input_required: str | None = None


def get_tool_names(tools: list[dict[str, Any]]) -> set[str]:
Expand Down
File renamed without changes.
77 changes: 37 additions & 40 deletions autogen/a2a/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from a2a.utils import get_message_text, new_agent_parts_message, new_artifact
from a2a.utils.message import new_agent_text_message

from autogen.remote.protocol import RequestMessage, ResponseMessage
from .remote import RequestMessage, ResponseMessage

AG2_METADATA_KEY_PREFIX = "ag2_"
CLIENT_TOOLS_KEY = f"{AG2_METADATA_KEY_PREFIX}client_tools"
Expand Down Expand Up @@ -125,51 +125,48 @@ def response_message_from_a2a_message(message: Message) -> ResponseMessage | Non
)


def response_message_to_a2a(
result: ResponseMessage | None,
context_id: str | None,
task_id: str | None,
) -> tuple[Artifact | None, list[Message], Message | None]:
if not result:
return (
new_artifact(
name="result",
parts=[],
description=None,
),
[],
None,
)

message_history = [
new_agent_parts_message(
parts=[message_to_part(m) for m in result.messages],
context_id=context_id,
task_id=task_id,
),
]

if result.input_required is not None:
input_message = new_agent_text_message(
text=result.input_required,
context_id=context_id,
task_id=task_id,
)
if result.context:
input_message.metadata = {CONTEXT_KEY: result.context}

return None, message_history, input_message

def make_artifact(
message: dict[str, Any] | None,
context: dict[str, Any] | None = None,
) -> Artifact:
artifact = new_artifact(
name="result",
parts=[message_to_part(result.messages[-1])],
parts=[message_to_part(message)] if message else [],
description=None,
)

if result.context:
artifact.metadata = {CONTEXT_KEY: result.context}
if context:
artifact.metadata = {CONTEXT_KEY: context}

return artifact, message_history, None
return artifact


def make_working_message(
message: dict[str, Any] | None,
context_id: str,
task_id: str,
) -> Message:
return new_agent_parts_message(
parts=[message_to_part(message)] if message else [],
context_id=context_id,
task_id=task_id,
)


def make_input_required_message(
text: str,
context_id: str,
task_id: str,
context: dict[str, Any] | None = None,
) -> Message:
message = new_agent_text_message(
text=text,
context_id=context_id,
task_id=task_id,
)
if context:
message.metadata = {CONTEXT_KEY: context}
return message


def message_to_part(message: dict[str, Any]) -> Part:
Expand Down
Loading
Loading